This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 916d570a7e [GOBBLIN-2167] Allow filtering of Hive datasets by
underlying HDFS folder location (#4069)
916d570a7e is described below
commit 916d570a7eb3db291a5f7af5dea3a1b75d8fff9e
Author: William Lo <[email protected]>
AuthorDate: Mon Oct 21 23:22:34 2024 -0400
[GOBBLIN-2167] Allow filtering of Hive datasets by underlying HDFS folder
location (#4069)
* Add regex filter for table based on location
---
.../management/copy/hive/HiveDatasetFinder.java | 32 +++++++++++----
.../copy/hive/HiveDatasetFinderTest.java | 45 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 8 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
index a73ae70486..b3a3c846e9 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.data.management.copy.hive;
-import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
@@ -25,12 +24,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-
-import javax.annotation.Nonnull;
-
-import lombok.Data;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
+import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -43,12 +37,18 @@ import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.annotation.Nonnull;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientCache;
import org.apache.gobblin.config.client.ConfigClientUtils;
@@ -80,6 +80,9 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
public static final String DEFAULT_TABLE_PATTERN = "*";
public static final String TABLE_FILTER = HIVE_DATASET_PREFIX +
".tableFilter";
+ // Property used to filter tables only physically within a folder,
represented by a regex
+ public static final String TABLE_FOLDER_ALLOWLIST_FILTER =
HIVE_DATASET_PREFIX + ".tableFolderAllowlistFilter";
+
/*
* By setting the prefix, only config keys with this prefix will be used to
build a HiveDataset.
* By passing scoped configurations the same config keys can be used in
different contexts.
@@ -118,6 +121,8 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
protected final Function<Table, String> configStoreDatasetUriBuilder;
protected final Optional<Predicate<Table>> tableFilter;
+ protected final Optional<Pattern> tableFolderAllowlistRegex;
+
protected final String datasetConfigPrefix;
protected final ConfigClient configClient;
private final Config jobConfig;
@@ -194,6 +199,8 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
} else {
this.tableFilter = Optional.absent();
}
+ this.tableFolderAllowlistRegex =
properties.containsKey(TABLE_FOLDER_ALLOWLIST_FILTER) ?
+
Optional.of(Pattern.compile(properties.getProperty(TABLE_FOLDER_ALLOWLIST_FILTER))):
Optional.absent();
}
protected static HiveMetastoreClientPool createClientPool(Properties
properties) throws IOException {
@@ -262,7 +269,10 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
try (AutoReturnableObject<IMetaStoreClient> client =
HiveDatasetFinder.this.clientPool.getClient()) {
Table table = client.get().getTable(dbAndTable.getDb(),
dbAndTable.getTable());
- if (tableFilter.isPresent() && !tableFilter.get().apply(table)) {
+ if ((tableFilter.isPresent() && !tableFilter.get().apply(table))
+ || !shouldAllowTableLocation(tableFolderAllowlistRegex,
table)) {
+ log.info("Ignoring table {} as its underlying location {} does
not pass allowlist regex {}", dbAndTable,
+ table.getSd().getLocation(),
tableFolderAllowlistRegex.get());
continue;
}
@@ -294,6 +304,12 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
};
}
+ protected static boolean shouldAllowTableLocation(Optional<Pattern> regex,
Table table) {
+ if (!regex.isPresent()) {
+ return true;
+ }
+ return regex.get().matcher(table.getSd().getLocation()).matches();
+ }
/**
* @deprecated Use {@link #createHiveDataset(Table, Config)} instead
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
index 0794520255..a9805d3954 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinderTest.java
@@ -215,6 +215,51 @@ public class HiveDatasetFinderTest {
}
+ @Test
+ public void testHiveTableFolderAllowlistFilter() throws Exception {
+ List<HiveDatasetFinder.DbAndTable> dbAndTables = Lists.newArrayList();
+ dbAndTables.add(new HiveDatasetFinder.DbAndTable("db1", "table1"));
+ // This table is created on /tmp/test
+ HiveMetastoreClientPool pool = getTestPool(dbAndTables);
+
+ Properties properties = new Properties();
+ properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." +
WhitelistBlacklist.WHITELIST, "");
+ // Try a regex with multiple groups
+ properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER,
"(/tmp/|a).*");
+
+ HiveDatasetFinder finder = new
TestHiveDatasetFinder(FileSystem.getLocal(new Configuration()), properties,
pool);
+ List<HiveDataset> datasets =
Lists.newArrayList(finder.getDatasetsIterator());
+
+ Assert.assertEquals(datasets.size(), 1);
+
+ properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." +
WhitelistBlacklist.WHITELIST, "");
+ // The table located at /tmp/test should be filtered
+ properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "/a/b");
+
+ finder = new TestHiveDatasetFinder(FileSystem.getLocal(new
Configuration()), properties, pool);
+ datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+ Assert.assertEquals(datasets.size(), 0);
+
+ // Test empty filter
+ properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." +
WhitelistBlacklist.WHITELIST, "");
+ // The table located at /tmp/test should be filtered
+ properties.put(HiveDatasetFinder.TABLE_FOLDER_ALLOWLIST_FILTER, "");
+
+ finder = new TestHiveDatasetFinder(FileSystem.getLocal(new
Configuration()), properties, pool);
+ datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+ Assert.assertEquals(datasets.size(), 0);
+
+ // Test no regex config
+ properties.put(HiveDatasetFinder.HIVE_DATASET_PREFIX + "." +
WhitelistBlacklist.WHITELIST, "");
+
+ finder = new TestHiveDatasetFinder(FileSystem.getLocal(new
Configuration()), properties, pool);
+ datasets = Lists.newArrayList(finder.getDatasetsIterator());
+
+ Assert.assertEquals(datasets.size(), 0);
+ }
+
private HiveMetastoreClientPool
getTestPool(List<HiveDatasetFinder.DbAndTable> dbAndTables) throws Exception {
SetMultimap<String, String> entities = HashMultimap.create();