This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8e8e3df03 [GOBBLIN-1779] Ability to filter datasets that contain non
optional unions (#3648)
8e8e3df03 is described below
commit 8e8e3df03bf71ef42b0f774186fdc24288966294
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Mar 9 14:01:32 2023 -0800
[GOBBLIN-1779] Ability to filter datasets that contain non optional unions
(#3648)
* [GOBBLIN-1779] Ability to filter datasets that contain non optional unions
* Address comments
---
.../data/management/dataset/DatasetUtils.java | 15 +-
.../dataset/DatasetsFinderFilteringDecorator.java | 108 ++++++++++++++
.../DatasetsFinderFilteringDecoratorTest.java | 156 +++++++++++++++++++++
.../DatasetHiveSchemaContainsNonOptionalUnion.java | 95 +++++++++++++
...asetHiveSchemaContainsNonOptionalUnionTest.java | 102 ++++++++++++++
.../iceberg/writer/HiveMetadataWriterTest.java | 12 +-
.../util/function/CheckedExceptionPredicate.java | 93 ++++++++++++
7 files changed, 571 insertions(+), 10 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
index 16386c222..37e789980 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
@@ -74,12 +74,19 @@ public class DatasetUtils {
* @throws IOException
*/
@SuppressWarnings("unchecked")
- public static <T extends org.apache.gobblin.dataset.Dataset>
DatasetsFinder<T> instantiateDatasetFinder(Properties props,
- FileSystem fs, String default_class, Object... additionalArgs)
+ public static <T extends org.apache.gobblin.dataset.Dataset>
DatasetsFinder<T> instantiateDatasetFinder(
+ Properties props, FileSystem fs, String default_class, Object...
additionalArgs)
throws IOException {
+ return instantiateDatasetFinder(DATASET_PROFILE_CLASS_KEY, props, fs,
default_class, additionalArgs);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends org.apache.gobblin.dataset.Dataset>
DatasetsFinder<T> instantiateDatasetFinder(
+ String classKey, Properties props, FileSystem fs, String default_class,
Object... additionalArgs)
+ throws IOException{
String className = default_class;
- if (props.containsKey(DATASET_PROFILE_CLASS_KEY)) {
- className = props.getProperty(DATASET_PROFILE_CLASS_KEY);
+ if (props.containsKey(classKey)) {
+ className = props.getProperty(classKey);
}
try {
Class<?> datasetFinderClass = Class.forName(className);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
new file mode 100644
index 000000000..7c7f63799
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a
{@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements
DatasetsFinder<T> {
+ private static final String PREFIX = "filtering.datasets.finder.";
+ public static final String DATASET_CLASS = PREFIX + "class";
+ public static final String ALLOWED = PREFIX + "allowed.predicates";
+ public static final String DENIED = PREFIX + "denied.predicates";
+
+ protected DatasetsFinder<T> datasetFinder;
+ protected List<CheckedExceptionPredicate<T,IOException>>
allowDatasetPredicates;
+ protected List<CheckedExceptionPredicate<T,IOException>>
denyDatasetPredicates;
+
+ public DatasetsFinderFilteringDecorator(FileSystem fs, Properties
properties) throws IOException {
+ this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+ DATASET_CLASS, properties, fs,
DefaultFileSystemGlobFinder.class.getName());
+ this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+ this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+ }
+
+ @VisibleForTesting
+ DatasetsFinderFilteringDecorator(
+ DatasetsFinder<T> datasetsFinder,
+ List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+ List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+ this.datasetFinder = datasetsFinder;
+ this.allowDatasetPredicates = allowDatasetPredicates;
+ this.denyDatasetPredicates = denyDatasetPredicates;
+ }
+
+ @Override
+ public List<T> findDatasets() throws IOException {
+ List<T> datasets = datasetFinder.findDatasets();
+ List<T> allowedDatasets = Collections.emptyList();
+ try {
+ allowedDatasets = datasets.parallelStream()
+ .filter(dataset -> allowDatasetPredicates.stream()
+ .map(CheckedExceptionPredicate::wrapToTunneled)
+ .allMatch(p -> p.test(dataset)))
+ .filter(dataset -> denyDatasetPredicates.stream()
+ .map(CheckedExceptionPredicate::wrapToTunneled)
+ .noneMatch(predicate -> predicate.test(dataset)))
+ .collect(Collectors.toList());
+ } catch (CheckedExceptionPredicate.WrappedIOException wrappedIOException) {
+ wrappedIOException.rethrowWrapped();
+ }
+
+ return allowedDatasets;
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return datasetFinder.commonDatasetRoot();
+ }
+
+ private List<CheckedExceptionPredicate<T,IOException>>
instantiatePredicates(String key, Properties props)
+ throws IOException {
+ List<CheckedExceptionPredicate<T,IOException>> predicates = new
ArrayList<>();
+ try {
+ for (String className : PropertiesUtils.getPropAsList(props, key)) {
+ predicates.add((CheckedExceptionPredicate<T, IOException>)
+ ConstructorUtils.invokeConstructor(Class.forName(className),
props));
+ }
+
+ return predicates;
+ } catch (ReflectiveOperationException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
new file mode 100644
index 000000000..99e0044ab
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.hadoop.fs.FileSystem;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DatasetsFinderFilteringDecoratorTest {
+ @Test
+ public void testFindDatasets_emptyAllowed() throws IOException {
+ DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+ Dataset mockDataset = Mockito.mock(Dataset.class);
+
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+ DatasetsFinderFilteringDecorator<Dataset> d = new
DatasetsFinderFilteringDecorator(
+ mockFinder,Collections.emptyList(), Collections.emptyList());
+ Assert.assertEquals(d.findDatasets(), Arrays.asList(mockDataset));
+ }
+
+ @Test
+ public void testFindDatasets_allowed() throws IOException {
+ DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+ Dataset mockDataset = Mockito.mock(Dataset.class);
+
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+ DatasetsFinderFilteringDecorator<Dataset> d = new
DatasetsFinderFilteringDecorator(
+ mockFinder,
+ Arrays.asList(new StubTrue(), new StubTrue()),
+ Arrays.asList(new StubFalse(), new StubFalse()));
+ Assert.assertEquals(d.findDatasets(), Arrays.asList(mockDataset));
+ }
+
+ @Test
+ public void testFindDatasets_denied() throws IOException {
+ DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+ Dataset mockDataset = Mockito.mock(Dataset.class);
+
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+ DatasetsFinderFilteringDecorator<Dataset> d = new
DatasetsFinderFilteringDecorator(mockFinder,
+ Arrays.asList(new StubTrue(), new StubFalse()),
+ Arrays.asList(new StubFalse()));
+ Assert.assertEquals(d.findDatasets(), Collections.emptyList());
+
+ d = new DatasetsFinderFilteringDecorator(mockFinder,
+ Arrays.asList(new StubTrue()),
+ Arrays.asList(new StubFalse(), new StubTrue()));
+ Assert.assertEquals(d.findDatasets(), Collections.emptyList());
+ }
+
+ @Test
+ public void testFindDatasets_throwsException() throws IOException {
+ DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+ Dataset mockDataset = Mockito.mock(Dataset.class);
+
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+ DatasetsFinderFilteringDecorator<Dataset> datasetFinder_1 = new
DatasetsFinderFilteringDecorator(mockFinder,
+ Arrays.asList(new StubTrue(), new ThrowsException()),
+ Arrays.asList(new StubFalse()));
+ Assert.assertThrows(IOException.class, datasetFinder_1::findDatasets);
+
+ DatasetsFinderFilteringDecorator<Dataset> datasetFinder_2 = new
DatasetsFinderFilteringDecorator(mockFinder,
+ Arrays.asList(new StubTrue()),
+ Arrays.asList(new StubFalse(), new ThrowsException()));
+ Assert.assertThrows(IOException.class, datasetFinder_2::findDatasets);
+ }
+
+ @Test
+ public void testInstantiationOfPredicatesAndDatasetFinder() throws
IOException {
+ DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+
+ Properties props = new Properties();
+ props.setProperty(DatasetsFinderFilteringDecorator.DATASET_CLASS,
mockFinder.getClass().getName());
+ props.setProperty(DatasetsFinderFilteringDecorator.ALLOWED,
StubTrue.class.getName());
+ props.setProperty(DatasetsFinderFilteringDecorator.DENIED,
StubFalse.class.getName());
+ DatasetsFinderFilteringDecorator<Dataset>
+ testFilterDataFinder = new
TestDatasetsFinderFilteringDecorator(Mockito.mock(FileSystem.class), props);
+
+ Assert.assertEquals(testFilterDataFinder.datasetFinder.getClass(),
mockFinder.getClass());
+
+ Assert.assertEquals(testFilterDataFinder.allowDatasetPredicates.size(), 1);
+ CheckedExceptionPredicate<Dataset, IOException> allowListPredicate =
testFilterDataFinder.allowDatasetPredicates.get(0);
+ Assert.assertEquals(allowListPredicate.getClass(), StubTrue.class);
+ Assert.assertEquals(((StubTrue) allowListPredicate).props, props);
+
+ Assert.assertEquals(testFilterDataFinder.denyDatasetPredicates.size(), 1);
+ CheckedExceptionPredicate<Dataset, IOException> denyListPredicate =
testFilterDataFinder.denyDatasetPredicates.get(0);
+ Assert.assertEquals(denyListPredicate.getClass(), StubFalse.class);
+ Assert.assertEquals(((StubFalse) denyListPredicate).props, props);
+ }
+
+ static class TestDatasetsFinderFilteringDecorator extends
DatasetsFinderFilteringDecorator<Dataset> {
+ public TestDatasetsFinderFilteringDecorator(FileSystem fs, Properties
properties) throws IOException {
+ super(fs, properties);
+ }
+ }
+
+ @AllArgsConstructor
+ static class StubTrue implements CheckedExceptionPredicate<Dataset,
IOException> {
+ Properties props;
+
+ StubTrue() {
+ this.props = null;
+ }
+
+ @Override
+ public boolean test(Dataset arg) throws IOException {
+ return true;
+ }
+ }
+
+ @AllArgsConstructor
+ static class StubFalse implements CheckedExceptionPredicate<Dataset,
IOException> {
+ Properties props;
+
+ StubFalse() {
+ this.props = null;
+ }
+
+ @Override
+ public boolean test(Dataset arg) throws IOException {
+ return false;
+ }
+ }
+
+ static class ThrowsException implements CheckedExceptionPredicate<Dataset,
IOException> {
+ @Override
+ public boolean test(Dataset arg) throws IOException {
+ throw new IOException("Throwing a test exception");
+ }
+ }
+}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
new file mode 100644
index 000000000..655c764c7
--- /dev/null
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Optional;
+
+import gobblin.configuration.State;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset>
implements CheckedExceptionPredicate<T, IOException> {
+ private final HiveRegister hiveRegister;
+ private final Pattern pattern;
+
+ public static final String PREFIX =
DatasetHiveSchemaContainsNonOptionalUnion.class.getName();
+ /**
+ * 1st match group is assumed to be the DB and the 2nd match group the Table
for the pattern
+ */
+ public static final String PATTERN = PREFIX + ".db.table.pattern";
+
+ public DatasetHiveSchemaContainsNonOptionalUnion(Properties properties) {
+ this.hiveRegister = getHiveRegister(new State(properties));
+ this.pattern = Pattern.compile(properties.getProperty(PATTERN));
+ }
+
+ @Override
+ public boolean test(T dataset) throws IOException {
+ Optional<HiveTable> hiveTable = getTable(dataset);
+ if (!hiveTable.isPresent()) {
+ log.error("No matching table for dataset={}", dataset);
+ return false;
+ }
+
+ return containsNonOptionalUnion(hiveTable.get());
+ }
+
+ private Optional<HiveTable> getTable(T dataset) throws IOException {
+ DbAndTable dbAndTable = getDbAndTable(dataset);
+ return this.hiveRegister.getTable(dbAndTable.getDb(),
dbAndTable.getTable());
+ }
+
+ private DbAndTable getDbAndTable(T dataset) {
+ Matcher m = pattern.matcher(dataset.getUrn());
+ if (!m.matches() || m.groupCount() != 2) {
+ throw new IllegalStateException(String.format("Dataset urn [%s] doesn't
follow expected pattern. " +
+ "Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
+ }
+ return new DbAndTable(m.group(1), m.group(2));
+ }
+
+ boolean containsNonOptionalUnion(HiveTable table) {
+ return HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(table);
+ }
+
+ private HiveRegister getHiveRegister(State state){
+ return HiveRegister.get(state);
+ }
+
+ @Data
+ private static class DbAndTable {
+ private final String db;
+ private final String table;
+ }
+}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
new file mode 100644
index 000000000..33cee4929
--- /dev/null
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(dependsOnGroups = "icebergMetadataWriterTest")
+public class DatasetHiveSchemaContainsNonOptionalUnionTest extends
HiveMetastoreTest {
+
+ private static String dbName = "dbname_" +
+
DatasetHiveSchemaContainsNonOptionalUnionTest.class.getSimpleName().toLowerCase();
+ private static File tmpDir;
+ private static State state;
+ private static String dbUri;
+ private static String testTable = "test_table";
+
+ @AfterSuite
+ public void clean() throws Exception {
+ FileUtils.forceDeleteOnExit(tmpDir);
+ }
+
+ @BeforeSuite
+ public void setup() throws Exception {
+ Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
+ startMetastore();
+ tmpDir = Files.createTempDir();
+ dbUri = String.format("%s/%s/%s", tmpDir.getAbsolutePath(),"metastore",
dbName);
+ try {
+ metastoreClient.getDatabase(dbName);
+ } catch (NoSuchObjectException e) {
+ metastoreClient.createDatabase(
+ new Database(dbName, "database", dbUri, Collections.emptyMap()));
+ }
+
+ final State serdeProps = new State();
+ final String avroSchema = "{\"type\": \"record\", \"name\":
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": "
+ + "[{\"name\":\"fieldName\", \"type\": %s}]}";
+ serdeProps.setProp("avro.schema.literal", String.format(avroSchema,
"[\"string\", \"int\"]"));
+ HiveTable testTable = createTestHiveTable_Avro(serdeProps);
+ metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
+
+ state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+ state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN,
"/data/(\\w+)/(\\w+)");
+ Assert.assertNotNull(metastoreClient.getTable(dbName,
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
+ }
+
+ @Test
+ public void testContainsNonOptionalUnion() throws Exception {
+ DatasetHiveSchemaContainsNonOptionalUnion predicate = new
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+ Dataset dataset = new SimpleDatasetForTesting("/data/" + dbName + "/" +
testTable);
+ Assert.assertTrue(predicate.test(dataset));
+ }
+
+ private HiveTable createTestHiveTable_Avro(State props) {
+ HiveTable.Builder builder = new HiveTable.Builder();
+ HiveTable hiveTable =
builder.withDbName(dbName).withTableName(testTable).withProps(props).build();
+ hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+ hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+ hiveTable.setSerDeType(AvroSerDe.class.getName());
+
+ // Serialize then deserialize as a way to quickly setup table object
+ Table table = HiveMetaStoreUtils.getTable(hiveTable);
+ return HiveMetaStoreUtils.getHiveTable(table);
+ }
+}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index cfcd50d28..4225ce5ce 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -46,8 +46,8 @@ import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.thrift.TException;
import org.mockito.Mockito;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
@@ -118,14 +118,14 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
IMetaStoreClient client;
private static TestHiveMetastore testHiveMetastore;
- @AfterClass
+ @AfterSuite
public void clean() throws Exception {
- //Finally stop the metaStore
- stopMetastore();
gobblinMCEWriter.close();
FileUtils.forceDeleteOnExit(tmpDir);
+ //Finally stop the metaStore
+ stopMetastore();
}
- @BeforeClass
+ @BeforeSuite
public void setUp() throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
startMetastore();
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
new file mode 100644
index 000000000..26bb75327
--- /dev/null
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Alternative to {@link Predicate} that handles wrapping (or tunneling) a
single checked {@link Exception} derived class.
+ * Based on and extremely similar to {@link CheckedExceptionFunction}.<br><br>
+ *
+ * At first glance, it appears these 2 classes could be generalized and
combined without the uncomfortable amount of duplication.
+ * But this is not possible to do cleanly because:
+ * <ul>
+ * <li> {@link Predicate} and {@link Function} are separate types with no
inheritance hierarchy relationship</li>
+ * <li>
+ * {@link
CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)} returns a
{@link Predicate}
+ * but {@link
CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)} returns a
{@link Function}. And
+ * since Java does not support higher level generics / type classes
(i.e. type parameters for types that are
+ * themselves parameterized)
+ * </li>
+ * </ul>
+ */
+@FunctionalInterface
+public interface CheckedExceptionPredicate<T, E extends Exception> {
+ /**
+ * Wrapper to tunnel {@link IOException} as an unchecked exception that
would later be unwrapped via
+ * {@link WrappedIOException#rethrowWrapped()}. If no expectation of
unwrapping, this wrapper may simply add
+ * unnecessary obfuscation: instead use {@link
CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)}
+ *
+ * BUMMER: specific {@link IOException} hard-coded because: "generic class
may not extend {@link java.lang.Throwable}"
+ */
+ @RequiredArgsConstructor
+ class WrappedIOException extends RuntimeException {
+ @Getter
+ private final IOException wrappedException;
+
+ /** CAUTION: if this be your intent, DO NOT FORGET! Being unchecked, the
compiler WILL NOT remind you. */
+ public void rethrowWrapped() throws IOException {
+ throw wrappedException;
+ }
+ }
+
+ boolean test(T arg) throws E;
+
+ /** @return {@link Predicate} proxy that catches any instance of {@link
Exception} and rethrows it wrapped as {@link RuntimeException} */
+ static <T, E extends Exception> Predicate<T>
wrapToUnchecked(CheckedExceptionPredicate<T, E> f) {
+ return a -> {
+ try {
+ return f.test(a);
+ } catch (RuntimeException re) {
+ throw re; // no double wrapping
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ /**
+ * @return {@link Predicate} proxy that catches any instance of {@link
IOException}, and rethrows it wrapped as {@link WrappedIOException},
+ * for easy unwrapping via {@link WrappedIOException#rethrowWrapped()}
+ */
+ static <T, E extends IOException> Predicate<T>
wrapToTunneled(CheckedExceptionPredicate<T, E> f) {
+ return a -> {
+ try {
+ return f.test(a);
+ } catch (RuntimeException re) {
+ throw re; // no double wrapping
+ } catch (IOException ioe) {
+ throw new WrappedIOException(ioe);
+ }
+ };
+ }
+}