This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e8e3df03 [GOBBLIN-1779] Ability to filter datasets that contain non 
optional unions (#3648)
8e8e3df03 is described below

commit 8e8e3df03bf71ef42b0f774186fdc24288966294
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Mar 9 14:01:32 2023 -0800

    [GOBBLIN-1779] Ability to filter datasets that contain non optional unions 
(#3648)
    
    * [GOBBLIN-1779] Ability to filter datasets that contain non optional unions
    
    * Address comments
---
 .../data/management/dataset/DatasetUtils.java      |  15 +-
 .../dataset/DatasetsFinderFilteringDecorator.java  | 108 ++++++++++++++
 .../DatasetsFinderFilteringDecoratorTest.java      | 156 +++++++++++++++++++++
 .../DatasetHiveSchemaContainsNonOptionalUnion.java |  95 +++++++++++++
 ...asetHiveSchemaContainsNonOptionalUnionTest.java | 102 ++++++++++++++
 .../iceberg/writer/HiveMetadataWriterTest.java     |  12 +-
 .../util/function/CheckedExceptionPredicate.java   |  93 ++++++++++++
 7 files changed, 571 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
index 16386c222..37e789980 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java
@@ -74,12 +74,19 @@ public class DatasetUtils {
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  public static <T extends org.apache.gobblin.dataset.Dataset> 
DatasetsFinder<T> instantiateDatasetFinder(Properties props,
-      FileSystem fs, String default_class, Object... additionalArgs)
+  public static <T extends org.apache.gobblin.dataset.Dataset> 
DatasetsFinder<T> instantiateDatasetFinder(
+      Properties props, FileSystem fs, String default_class, Object... 
additionalArgs)
       throws IOException {
+    return instantiateDatasetFinder(DATASET_PROFILE_CLASS_KEY, props, fs, 
default_class, additionalArgs);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends org.apache.gobblin.dataset.Dataset> 
DatasetsFinder<T> instantiateDatasetFinder(
+      String classKey, Properties props, FileSystem fs, String default_class, 
Object... additionalArgs)
+  throws IOException{
     String className = default_class;
-    if (props.containsKey(DATASET_PROFILE_CLASS_KEY)) {
-      className = props.getProperty(DATASET_PROFILE_CLASS_KEY);
+    if (props.containsKey(classKey)) {
+      className = props.getProperty(classKey);
     }
     try {
       Class<?> datasetFinderClass = Class.forName(className);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
new file mode 100644
index 000000000..7c7f63799
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * A decorator for filtering datasets after a {@link DatasetsFinder} finds a 
{@link List} of {@link Dataset}s
+ */
+public class DatasetsFinderFilteringDecorator<T extends Dataset> implements 
DatasetsFinder<T> {
+  private static final String PREFIX = "filtering.datasets.finder.";
+  public static final String DATASET_CLASS = PREFIX + "class";
+  public static final String ALLOWED = PREFIX + "allowed.predicates";
+  public static final String DENIED = PREFIX + "denied.predicates";
+
+  protected DatasetsFinder<T> datasetFinder;
+  protected List<CheckedExceptionPredicate<T,IOException>> 
allowDatasetPredicates;
+  protected List<CheckedExceptionPredicate<T,IOException>> 
denyDatasetPredicates;
+
+  public DatasetsFinderFilteringDecorator(FileSystem fs, Properties 
properties) throws IOException {
+    this.datasetFinder = DatasetUtils.instantiateDatasetFinder(
+        DATASET_CLASS, properties, fs, 
DefaultFileSystemGlobFinder.class.getName());
+    this.allowDatasetPredicates = instantiatePredicates(ALLOWED, properties);
+    this.denyDatasetPredicates = instantiatePredicates(DENIED, properties);
+  }
+
+  @VisibleForTesting
+  DatasetsFinderFilteringDecorator(
+      DatasetsFinder<T> datasetsFinder,
+      List<CheckedExceptionPredicate<T,IOException>> allowDatasetPredicates,
+      List<CheckedExceptionPredicate<T,IOException>> denyDatasetPredicates) {
+    this.datasetFinder = datasetsFinder;
+    this.allowDatasetPredicates = allowDatasetPredicates;
+    this.denyDatasetPredicates = denyDatasetPredicates;
+  }
+
+  @Override
+  public List<T> findDatasets() throws IOException {
+    List<T> datasets = datasetFinder.findDatasets();
+    List<T> allowedDatasets = Collections.emptyList();
+    try {
+      allowedDatasets = datasets.parallelStream()
+          .filter(dataset -> allowDatasetPredicates.stream()
+              .map(CheckedExceptionPredicate::wrapToTunneled)
+              .allMatch(p -> p.test(dataset)))
+          .filter(dataset -> denyDatasetPredicates.stream()
+              .map(CheckedExceptionPredicate::wrapToTunneled)
+              .noneMatch(predicate -> predicate.test(dataset)))
+          .collect(Collectors.toList());
+    } catch (CheckedExceptionPredicate.WrappedIOException wrappedIOException) {
+      wrappedIOException.rethrowWrapped();
+    }
+
+    return allowedDatasets;
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return datasetFinder.commonDatasetRoot();
+  }
+
+  private List<CheckedExceptionPredicate<T,IOException>> 
instantiatePredicates(String key, Properties props)
+      throws IOException {
+    List<CheckedExceptionPredicate<T,IOException>> predicates = new 
ArrayList<>();
+    try {
+      for (String className : PropertiesUtils.getPropAsList(props, key)) {
+        predicates.add((CheckedExceptionPredicate<T, IOException>)
+            ConstructorUtils.invokeConstructor(Class.forName(className), 
props));
+      }
+
+      return predicates;
+    } catch (ReflectiveOperationException e) {
+      throw new IOException(e);
+    }
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
new file mode 100644
index 000000000..99e0044ab
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecoratorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import lombok.AllArgsConstructor;
+import org.apache.gobblin.dataset.DatasetsFinder;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+import org.apache.hadoop.fs.FileSystem;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class DatasetsFinderFilteringDecoratorTest {
+  @Test
+  public void testFindDatasets_emptyAllowed() throws IOException {
+    DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+    Dataset mockDataset = Mockito.mock(Dataset.class);
+    
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+    DatasetsFinderFilteringDecorator<Dataset> d = new 
DatasetsFinderFilteringDecorator(
+        mockFinder,Collections.emptyList(), Collections.emptyList());
+    Assert.assertEquals(d.findDatasets(), Arrays.asList(mockDataset));
+  }
+
+  @Test
+  public void testFindDatasets_allowed() throws IOException {
+    DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+    Dataset mockDataset = Mockito.mock(Dataset.class);
+    
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+    DatasetsFinderFilteringDecorator<Dataset> d = new 
DatasetsFinderFilteringDecorator(
+        mockFinder,
+        Arrays.asList(new StubTrue(), new StubTrue()),
+        Arrays.asList(new StubFalse(), new StubFalse()));
+    Assert.assertEquals(d.findDatasets(), Arrays.asList(mockDataset));
+  }
+
+  @Test
+  public void testFindDatasets_denied() throws IOException {
+    DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+    Dataset mockDataset = Mockito.mock(Dataset.class);
+    
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+    DatasetsFinderFilteringDecorator<Dataset> d = new 
DatasetsFinderFilteringDecorator(mockFinder,
+        Arrays.asList(new StubTrue(), new StubFalse()),
+        Arrays.asList(new StubFalse()));
+    Assert.assertEquals(d.findDatasets(), Collections.emptyList());
+
+    d = new DatasetsFinderFilteringDecorator(mockFinder,
+        Arrays.asList(new StubTrue()),
+        Arrays.asList(new StubFalse(), new StubTrue()));
+    Assert.assertEquals(d.findDatasets(), Collections.emptyList());
+  }
+
+  @Test
+  public void testFindDatasets_throwsException() throws IOException {
+    DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+    Dataset mockDataset = Mockito.mock(Dataset.class);
+    
Mockito.when(mockFinder.findDatasets()).thenReturn(Arrays.asList(mockDataset));
+
+    DatasetsFinderFilteringDecorator<Dataset> datasetFinder_1 = new 
DatasetsFinderFilteringDecorator(mockFinder,
+        Arrays.asList(new StubTrue(), new ThrowsException()),
+        Arrays.asList(new StubFalse()));
+    Assert.assertThrows(IOException.class, datasetFinder_1::findDatasets);
+
+    DatasetsFinderFilteringDecorator<Dataset> datasetFinder_2 = new 
DatasetsFinderFilteringDecorator(mockFinder,
+        Arrays.asList(new StubTrue()),
+        Arrays.asList(new StubFalse(), new ThrowsException()));
+    Assert.assertThrows(IOException.class, datasetFinder_2::findDatasets);
+  }
+
+  @Test
+  public void testInstantiationOfPredicatesAndDatasetFinder() throws 
IOException {
+    DatasetsFinder<Dataset> mockFinder = Mockito.mock(DatasetsFinder.class);
+
+    Properties props = new Properties();
+    props.setProperty(DatasetsFinderFilteringDecorator.DATASET_CLASS, 
mockFinder.getClass().getName());
+    props.setProperty(DatasetsFinderFilteringDecorator.ALLOWED, 
StubTrue.class.getName());
+    props.setProperty(DatasetsFinderFilteringDecorator.DENIED, 
StubFalse.class.getName());
+    DatasetsFinderFilteringDecorator<Dataset>
+        testFilterDataFinder = new 
TestDatasetsFinderFilteringDecorator(Mockito.mock(FileSystem.class), props);
+
+    Assert.assertEquals(testFilterDataFinder.datasetFinder.getClass(), 
mockFinder.getClass());
+
+    Assert.assertEquals(testFilterDataFinder.allowDatasetPredicates.size(), 1);
+    CheckedExceptionPredicate<Dataset, IOException> allowListPredicate = 
testFilterDataFinder.allowDatasetPredicates.get(0);
+    Assert.assertEquals(allowListPredicate.getClass(), StubTrue.class);
+    Assert.assertEquals(((StubTrue) allowListPredicate).props, props);
+
+    Assert.assertEquals(testFilterDataFinder.denyDatasetPredicates.size(), 1);
+    CheckedExceptionPredicate<Dataset, IOException> denyListPredicate = 
testFilterDataFinder.denyDatasetPredicates.get(0);
+    Assert.assertEquals(denyListPredicate.getClass(), StubFalse.class);
+    Assert.assertEquals(((StubFalse) denyListPredicate).props, props);
+  }
+
+  static class TestDatasetsFinderFilteringDecorator extends 
DatasetsFinderFilteringDecorator<Dataset> {
+    public TestDatasetsFinderFilteringDecorator(FileSystem fs, Properties 
properties) throws IOException {
+      super(fs, properties);
+    }
+  }
+
+  @AllArgsConstructor
+  static class StubTrue implements CheckedExceptionPredicate<Dataset, 
IOException> {
+    Properties props;
+
+    StubTrue() {
+      this.props = null;
+    }
+
+    @Override
+    public boolean test(Dataset arg) throws IOException {
+      return true;
+    }
+  }
+
+  @AllArgsConstructor
+  static class StubFalse implements CheckedExceptionPredicate<Dataset, 
IOException> {
+    Properties props;
+
+    StubFalse() {
+      this.props = null;
+    }
+
+    @Override
+    public boolean test(Dataset arg) throws IOException {
+      return false;
+    }
+  }
+
+  static class ThrowsException implements CheckedExceptionPredicate<Dataset, 
IOException> {
+    @Override
+    public boolean test(Dataset arg) throws IOException {
+      throw new IOException("Throwing a test exception");
+    }
+  }
+}
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
new file mode 100644
index 000000000..655c764c7
--- /dev/null
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Optional;
+
+import gobblin.configuration.State;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.hive.HiveRegister;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.function.CheckedExceptionPredicate;
+
+
+/**
+ * Determines if a dataset's hive schema contains a non optional union
+ */
+@Slf4j
+public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> 
implements CheckedExceptionPredicate<T, IOException> {
+  private final HiveRegister hiveRegister;
+  private final Pattern pattern;
+
+  public static final String PREFIX = 
DatasetHiveSchemaContainsNonOptionalUnion.class.getName();
+  /**
+   * 1st match group is assumed to be the DB and the 2nd match group the Table 
for the pattern
+   */
+  public static final String PATTERN = PREFIX + ".db.table.pattern";
+
+  public DatasetHiveSchemaContainsNonOptionalUnion(Properties properties) {
+    this.hiveRegister = getHiveRegister(new State(properties));
+    this.pattern = Pattern.compile(properties.getProperty(PATTERN));
+  }
+
+  @Override
+  public boolean test(T dataset) throws IOException {
+    Optional<HiveTable> hiveTable = getTable(dataset);
+    if (!hiveTable.isPresent()) {
+      log.error("No matching table for dataset={}", dataset);
+      return false;
+    }
+
+    return containsNonOptionalUnion(hiveTable.get());
+  }
+
+  private Optional<HiveTable> getTable(T dataset) throws IOException {
+    DbAndTable dbAndTable = getDbAndTable(dataset);
+    return this.hiveRegister.getTable(dbAndTable.getDb(), 
dbAndTable.getTable());
+  }
+
+  private DbAndTable getDbAndTable(T dataset) {
+    Matcher m = pattern.matcher(dataset.getUrn());
+    if (!m.matches() || m.groupCount() != 2) {
+      throw new IllegalStateException(String.format("Dataset urn [%s] doesn't 
follow expected pattern. " +
+      "Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
+    }
+    return new DbAndTable(m.group(1), m.group(2));
+  }
+
+  boolean containsNonOptionalUnion(HiveTable table) {
+    return HiveMetaStoreUtils.containsNonOptionalUnionTypeColumn(table);
+  }
+
+  private HiveRegister getHiveRegister(State state){
+    return HiveRegister.get(state);
+  }
+
+  @Data
+  private static class DbAndTable {
+    private final String db;
+    private final String table;
+  }
+}
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
new file mode 100644
index 000000000..33cee4929
--- /dev/null
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.iceberg.predicates;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(dependsOnGroups = "icebergMetadataWriterTest")
+public class DatasetHiveSchemaContainsNonOptionalUnionTest extends 
HiveMetastoreTest {
+
+  private static String dbName = "dbname_" +
+      
DatasetHiveSchemaContainsNonOptionalUnionTest.class.getSimpleName().toLowerCase();
+  private static File tmpDir;
+  private static State state;
+  private static String dbUri;
+  private static String testTable = "test_table";
+
+  @AfterSuite
+  public void clean() throws Exception {
+    FileUtils.forceDeleteOnExit(tmpDir);
+  }
+
+  @BeforeSuite
+  public void setup() throws Exception {
+    Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
+    startMetastore();
+    tmpDir = Files.createTempDir();
+    dbUri = String.format("%s/%s/%s", tmpDir.getAbsolutePath(),"metastore", 
dbName);
+    try {
+      metastoreClient.getDatabase(dbName);
+    } catch (NoSuchObjectException e) {
+      metastoreClient.createDatabase(
+          new Database(dbName, "database", dbUri, Collections.emptyMap()));
+    }
+
+    final State serdeProps = new State();
+    final String avroSchema = "{\"type\": \"record\", \"name\": 
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": "
+        + "[{\"name\":\"fieldName\", \"type\": %s}]}";
+    serdeProps.setProp("avro.schema.literal", String.format(avroSchema, 
"[\"string\", \"int\"]"));
+    HiveTable testTable = createTestHiveTable_Avro(serdeProps);
+    metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
+
+    state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
+    state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, 
"/data/(\\w+)/(\\w+)");
+    Assert.assertNotNull(metastoreClient.getTable(dbName, 
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
+  }
+
+  @Test
+  public void testContainsNonOptionalUnion() throws Exception {
+    DatasetHiveSchemaContainsNonOptionalUnion predicate = new 
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+    Dataset dataset = new SimpleDatasetForTesting("/data/" + dbName + "/" + 
testTable);
+    Assert.assertTrue(predicate.test(dataset));
+  }
+
+  private HiveTable createTestHiveTable_Avro(State props) {
+    HiveTable.Builder builder = new HiveTable.Builder();
+    HiveTable hiveTable = 
builder.withDbName(dbName).withTableName(testTable).withProps(props).build();
+    hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+    hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+    hiveTable.setSerDeType(AvroSerDe.class.getName());
+
+    // Serialize then deserialize as a way to quickly setup table object
+    Table table = HiveMetaStoreUtils.getTable(hiveTable);
+    return HiveMetaStoreUtils.getHiveTable(table);
+  }
+}
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index cfcd50d28..4225ce5ce 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -46,8 +46,8 @@ import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.thrift.TException;
 import org.mockito.Mockito;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
@@ -118,14 +118,14 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
   IMetaStoreClient client;
   private static TestHiveMetastore testHiveMetastore;
 
-  @AfterClass
+  @AfterSuite
   public void clean() throws Exception {
-    //Finally stop the metaStore
-    stopMetastore();
     gobblinMCEWriter.close();
     FileUtils.forceDeleteOnExit(tmpDir);
+    //Finally stop the metaStore
+    stopMetastore();
   }
-  @BeforeClass
+  @BeforeSuite
   public void setUp() throws Exception {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
     startMetastore();
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
new file mode 100644
index 000000000..26bb75327
--- /dev/null
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionPredicate.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.function;
+
+import java.io.IOException;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Alternative to {@link Predicate} that handles wrapping (or tunneling) a 
single checked {@link Exception} derived class.
+ * Based on and extremely similar to {@link CheckedExceptionFunction}.<br><br>
+ *
+ * At first glance, it appears these 2 classes could be generalized and 
combined without the uncomfortable amount of duplication.
+ * But this is not possible to do cleanly because:
+ *   <ul>
+ *     <li> {@link Predicate} and {@link Function} are separate types with no 
inheritance hierarchy relationship</li>
+ *     <li>
+ *       {@link 
CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)} returns a 
{@link Predicate}
+ *       but {@link 
CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)} returns a 
{@link Function}. And
+ *       since Java does not support higher level generics / type classes 
(i.e. type parameters for types that are
+ *       themselves parameterized)
+ *     </li>
+ *   </ul>
+ */
+@FunctionalInterface
+public interface CheckedExceptionPredicate<T, E extends Exception> {
+  /**
+   * Wrapper to tunnel {@link IOException} as an unchecked exception that 
would later be unwrapped via
+   * {@link WrappedIOException#rethrowWrapped()}.  If no expectation of 
unwrapping, this wrapper may simply add
+   * unnecessary obfuscation: instead use {@link 
CheckedExceptionPredicate#wrapToUnchecked(CheckedExceptionPredicate)}
+   *
+   * BUMMER: specific {@link IOException} hard-coded because: "generic class 
may not extend {@link java.lang.Throwable}"
+   */
+  @RequiredArgsConstructor
+  class WrappedIOException extends RuntimeException {
+    @Getter
+    private final IOException wrappedException;
+
+    /** CAUTION: if this be your intent, DO NOT FORGET!  Being unchecked, the 
compiler WILL NOT remind you. */
+    public void rethrowWrapped() throws IOException {
+      throw wrappedException;
+    }
+  }
+
+  boolean test(T arg) throws E;
+
+  /** @return {@link Predicate} proxy that catches any instance of {@link 
Exception} and rethrows it wrapped as {@link RuntimeException} */
+  static <T,  E extends Exception> Predicate<T> 
wrapToUnchecked(CheckedExceptionPredicate<T, E> f) {
+    return a -> {
+      try {
+        return f.test(a);
+      } catch (RuntimeException re) {
+        throw re; // no double wrapping
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  /**
+   * @return {@link Predicate} proxy that catches any instance of {@link 
IOException}, and rethrows it wrapped as {@link WrappedIOException},
+   * for easy unwrapping via {@link WrappedIOException#rethrowWrapped()}
+   */
+  static <T, E extends IOException> Predicate<T> 
wrapToTunneled(CheckedExceptionPredicate<T, E> f) {
+    return a -> {
+      try {
+        return f.test(a);
+      } catch (RuntimeException re) {
+        throw re; // no double wrapping
+      } catch (IOException ioe) {
+        throw new WrappedIOException(ioe);
+      }
+    };
+  }
+}

Reply via email to