This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 916d570a7e [GOBBLIN-2167] Allow filtering of Hive datasets by 
underlying HDFS folder location (#4069)
916d570a7e is described below

commit 916d570a7eb3db291a5f7af5dea3a1b75d8fff9e
Author: William Lo <[email protected]>
AuthorDate: Mon Oct 21 23:22:34 2024 -0400

    [GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS folder 
location (#4069)
    
    * Add regex filter for table based on location
---
 .../management/copy/hive/HiveDatasetFinder.java    | 32 +++++++++++----
 .../copy/hive/HiveDatasetFinderTest.java           | 45 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
index a73ae70486..b3a3c846e9 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
-import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URISyntaxException;
@@ -25,12 +24,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
-
-import javax.annotation.Nonnull;
-
-import lombok.Data;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -43,12 +37,18 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nonnull;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.config.client.ConfigClient;
 import org.apache.gobblin.config.client.ConfigClientCache;
 import org.apache.gobblin.config.client.ConfigClientUtils;
@@ -80,6 +80,9 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
   public static final String DEFAULT_TABLE_PATTERN = "*";
   public static final String TABLE_FILTER = HIVE_DATASET_PREFIX + 
".tableFilter";
 
+  // Property used to filter tables only physically within a folder, 
represented by a regex
+  public static final String TABLE_FOLDER_ALLOWLIST_FILTER = 
HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";
+
   /*
    * By setting the prefix, only config keys with this prefix will be used to 
build a HiveDataset.
    * By passing scoped configurations the same config keys can be used in 
different contexts.
@@ -118,6 +121,8 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
   protected final Function<Table, String> configStoreDatasetUriBuilder;
   protected final Optional<Predicate<Table>> tableFilter;
 
+  protected final Optional<Pattern> tableFolderAllowlistRegex;
+
   protected final String datasetConfigPrefix;
   protected final ConfigClient configClient;
   private final Config jobConfig;
@@ -194,6 +199,8 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
     } else {
       this.tableFilter = Optional.absent();
     }
+    this.tableFolderAllowlistRegex = 
properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
+        
Optional.of(Pattern.compile(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER))):
 Optional.absent();
   }
 
   protected static HiveMetastoreClientPool createClientPool(Properties 
properties) throws IOException {
@@ -262,7 +269,10 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
 
           try (AutoReturnableObject<IMetaStoreClient> client = 
HiveDatasetFinder.this.clientPool.getClient()) {
             Table table = client.get().getTable(dbAndTable.getDb(), 
dbAndTable.getTable());
-            if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
+            if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
+                || !shouldAllowTableLocation(tableFolderAllowlistRegex, 
table)) {
+              log.info("Ignoring table {} as its underlying location {} does 
not pass allowlist regex {}", dbAndTable,
+                  table.getSd().getLocation(), 
tableFolderAllowlistRegex.get());
               continue;
             }
 
@@ -294,6 +304,12 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
     };
   }
 
+  protected static boolean shouldAllowTableLocation(Optional<Pattern> regex, 
Table table) {
+    if (!regex.isPresent()) {
+      return true;
+    }
+    return regex.get().matcher(table.getSd().getLocation()).matches();
+  }
 
   /**
    * @deprecated Use {@link #createHiveDataset(Table, Config)} instead
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
index 0794520255..a9805d3954 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
@@ -215,6 +215,51 @@ public class HiveDatasetFinderTest {
 
   }
 
+  @Test
+  public void testHiveTableFolderAllowlistFilter() throws Exception {
+    List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
+    dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
+    // This table is created on /tmp/test
+    HiveMetastoreClientPool pool = getTestPool(dbAndTables);
+
+    Properties properties = new Properties();
+    properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + 
WhitelistBlacklist.WHITELIST, "");
+    // Try a regex with multiple groups
+    properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, 
"(/tmp/|a).*");
+
+    HiveDatasetFinder finder = new 
TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties, 
pool);
+    List<HiveDataset> datasets = 
Lists.newArrayList(finder.getDatasetsIterator());
+
+    Assert.assertEquals(datasets.size(), 1);
+
+    properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + 
WhitelistBlacklist.WHITELIST, "");
+    // The table located at /tmp/test should be filtered
+    properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");
+
+    finder = new TestHiveDatasetFinder(FileSystem.getLocal(new 
Configuration()), properties, pool);
+    datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+    Assert.assertEquals(datasets.size(), 0);
+
+    // Test empty filter
+    properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + 
WhitelistBlacklist.WHITELIST, "");
+    // The table located at /tmp/test should be filtered
+    properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "");
+
+    finder = new TestHiveDatasetFinder(FileSystem.getLocal(new 
Configuration()), properties, pool);
+    datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+    Assert.assertEquals(datasets.size(), 0);
+
+    // Test no regex config
+    properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." + 
WhitelistBlacklist.WHITELIST, "");
+
+    finder = new TestHiveDatasetFinder(FileSystem.getLocal(new 
Configuration()), properties, pool);
+    datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+    Assert.assertEquals(datasets.size(), 0);
+  }
+
   private HiveMetastoreClientPool 
getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {
 
     SetMultimap<String, String> entities = HashMultimap.create();

Reply via email to