Repository: incubator-gobblin Updated Branches: refs/heads/master 0975312c7 -> d9d7d5f0c
[GOBBLIN-173] Add pattern support for job-level blacklist in distcpNG/replication Add pattern support for job-level blacklist in distcpNG/replication Move the job-level blacklist into ConfigBasedDataset.java, totally saperated from configStore clarify the semantics of two-level black/whitelist Closes #2015 from autumnust/blacklistPattern Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d9d7d5f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d9d7d5f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d9d7d5f0 Branch: refs/heads/master Commit: d9d7d5f0c53c35f1cb77dbfe2f41b568a9c3ba30 Parents: 0975312 Author: Lei Sun <[email protected]> Authored: Fri Jul 28 19:20:36 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Jul 28 19:21:51 2017 -0700 ---------------------------------------------------------------------- .../ConfigBasedCopyableDatasetFinder.java | 9 +-- .../copy/replication/ConfigBasedDataset.java | 9 ++- .../replication/ConfigBasedDatasetsFinder.java | 60 ++++++++--------- .../replication/ConfigBasedMultiDatasets.java | 68 ++++++++++++++++++-- .../ConfigBasedCleanabledDatasetFinder.java | 4 +- .../ConfigBasedDatasetsFinderTest.java | 36 ++++++++++- 6 files changed, 145 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedCopyableDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedCopyableDatasetFinder.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedCopyableDatasetFinder.java index aef5234..ea46556 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedCopyableDatasetFinder.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedCopyableDatasetFinder.java @@ -18,7 +18,6 @@ package gobblin.data.management.copy.replication; -import gobblin.data.management.retention.dataset.ConfigurableCleanableDataset; import java.io.IOException; import java.net.URI; import java.util.Collection; @@ -29,6 +28,7 @@ import java.util.concurrent.Callable; import org.apache.hadoop.fs.FileSystem; import com.typesafe.config.Config; +import com.google.common.base.Optional; import gobblin.config.client.ConfigClient; import gobblin.dataset.Dataset; @@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j; * Based on the ConfigStore object to find all {@link ConfigBasedMultiDatasets} to replicate. * Specifically for replication job. * Normal DistcpNG Job which doesn'involve Dataflow concepts should not use this DatasetFinder but - * different implementation of {@link ConfigBasedDatasetsFinder}. + * different implementation of {@link ConfigBasedDatasetsFinder}. */ @Slf4j public class ConfigBasedCopyableDatasetFinder extends ConfigBasedDatasetsFinder { @@ -49,13 +49,14 @@ public class ConfigBasedCopyableDatasetFinder extends ConfigBasedDatasetsFinder } protected Callable<Void> findDatasetsCallable(final ConfigClient confClient, - final URI u, final Properties p, final Collection<Dataset> datasets) { + final URI u, final Properties p, Optional<List<String>> blacklistPatterns, final Collection<Dataset> datasets) { return new Callable<Void>() { @Override public Void call() throws Exception { // Process each {@link Config}, find dataset and add those into the datasets Config c = confClient.getConfig(u); - List<Dataset> datasetForConfig = new ConfigBasedMultiDatasets(c, p).getConfigBasedDatasetList(); + List<Dataset> datasetForConfig = + new ConfigBasedMultiDatasets(c, p, blacklistPatterns).getConfigBasedDatasetList(); datasets.addAll(datasetForConfig); return null; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java index bc892ed..27bb339 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDataset.java @@ -76,9 +76,16 @@ public class ConfigBasedDataset implements CopyableDataset { this.props = props; this.copyRoute = copyRoute; this.rc = rc; + calculateDatasetURN(); this.watermarkEnabled = Boolean.parseBoolean (this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true")); - calculateDatasetURN(); + } + + public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) { + this.props = props; + this.copyRoute = copyRoute; + this.rc = rc; + this.datasetURN = datasetURN; } private void calculateDatasetURN(){ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java index e75f9a4..b4d16a5 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.java @@ -33,7 +33,6 @@ import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -88,12 +87,13 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { public static final String GOBBLIN_CONFIG_STORE_DATASET_COMMON_ROOT = ConfigurationKeys.CONFIG_BASED_PREFIX + ".dataset.common.root"; - // In addition to the white/blacklist tags, this configuration let the user to black/whitelist some datasets - // in the job-level configuration, which is not in configStore - // as to have easier approach to black/whitelist some datasets. - // The semantics keep still as tag, which the blacklist override whitelist if any dataset in common. + // In addition to the white/blacklist tags, this configuration let the user to whitelist some datasets + // in the job-level configuration, which is not specified in configStore + // as to have easier approach to black/whitelist some datasets on operation side. + // White job-level blacklist is different from tag-based blacklist since the latter is part of dataset discovery + // but the former is filtering process. + // Tag-based dataset discover happens at the first, before the job-level glob-pattern based filtering. public static final String JOB_LEVEL_BLACKLIST = CopyConfiguration.COPY_PREFIX + ".configBased.blacklist" ; - public static final String JOB_LEVEL_WHITELIST = CopyConfiguration.COPY_PREFIX + ".configBased.whitelist" ; // There are some cases that WATERMARK checking is desired, like // Unexpected data loss on target while not changing watermark accordingly. @@ -109,8 +109,10 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { protected final Properties props; private final int threadPoolSize; - private Optional<List<String>> blacklistURNs; - private Optional<List<String>> whitelistURNs; + /** + * The blacklist Pattern, will be used in ConfigBasedDataset class which has the access to FileSystem. + */ + private final Optional<List<String>> blacklistPatterns; public ConfigBasedDatasetsFinder(FileSystem fs, Properties jobProps) throws IOException { @@ -150,44 +152,39 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { if (props.containsKey(JOB_LEVEL_BLACKLIST)) { - this.blacklistURNs = Optional.of(Splitter.on(",").omitEmptyStrings().splitToList(props.getProperty(JOB_LEVEL_BLACKLIST))); + this.blacklistPatterns = Optional.of(Splitter.on(",").omitEmptyStrings().splitToList(props.getProperty(JOB_LEVEL_BLACKLIST))); } else { - this.blacklistURNs = Optional.absent(); + this.blacklistPatterns = Optional.absent(); } - if (props.containsKey(JOB_LEVEL_WHITELIST)) { - this.whitelistURNs = Optional.of(Splitter.on(",").omitEmptyStrings().splitToList(props.getProperty(JOB_LEVEL_WHITELIST))); - } else { - this.whitelistURNs = Optional.absent(); - } } + /** + * Semantic of black/whitelist: + * - Whitelist always respect blacklist. + * - Job-level blacklist is reponsible for dataset filtering instead of dataset discovery. i.e. + * There's no implementation of job-level whitelist currently. + */ protected Set<URI> getValidDatasetURIs(Path datasetCommonRoot) { Collection<URI> allDatasetURIs; Set<URI> disabledURISet = new HashSet(); - if (this.blacklistURNs.isPresent()) { - for(String urn : this.blacklistURNs.get()) { - disabledURISet.add(this.datasetURNtoURI(urn)); - } - } + // This try block basically populate the Valid dataset URI set. try { - // get all the URIs which imports {@link #replicationTag} or all from whitelistURNs - allDatasetURIs = this.whitelistURNs.isPresent() - ? this.whitelistURNs.get().stream().map(u -> this.datasetURNtoURI(u)).collect(Collectors.toList()) : configClient.getImportedBy(new URI(whitelistTag.toString()), true); - populateDisabledURIs(disabledURISet); + allDatasetURIs = configClient.getImportedBy(new URI(whitelistTag.toString()), true); + enhanceDisabledURIsWithBlackListTag(disabledURISet); } catch ( ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException | URISyntaxException e) { log.error("Caught error while getting all the datasets URIs " + e.getMessage()); throw new RuntimeException(e); } - return getValidDatasetURIs(allDatasetURIs, disabledURISet, datasetCommonRoot); + return getValidDatasetURIsHelper(allDatasetURIs, disabledURISet, datasetCommonRoot); } /** * Extended signature for testing convenience. */ - protected static Set<URI> getValidDatasetURIs(Collection<URI> allDatasetURIs, Set<URI> disabledURISet, Path datasetCommonRoot){ + protected static Set<URI> getValidDatasetURIsHelper(Collection<URI> allDatasetURIs, Set<URI> disabledURISet, Path datasetCommonRoot){ if (allDatasetURIs == null || allDatasetURIs.isEmpty()) { return ImmutableSet.of(); } @@ -237,13 +234,12 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { return validURISet; } - private void populateDisabledURIs(Set<URI> disabledURIs) throws + private void enhanceDisabledURIsWithBlackListTag(Set<URI> disabledURIs) throws URISyntaxException, ConfigStoreFactoryDoesNotExistsException, ConfigStoreCreationException, VersionDoesNotExistException { if (this.blacklistTags.isPresent()) { - disabledURIs = new HashSet<URI>(); for (Path s : this.blacklistTags.get()) { disabledURIs.addAll(configClient.getImportedBy(new URI(s.toString()), true)); } @@ -276,7 +272,7 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { Iterators.transform(leafDatasets.iterator(), new Function<URI, Callable<Void>>() { @Override public Callable<Void> apply(final URI datasetURI) { - return findDatasetsCallable(configClient, datasetURI, props, result); + return findDatasetsCallable(configClient, datasetURI, props, blacklistPatterns, result); } }); @@ -298,6 +294,8 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { /** * Helper funcition for converting datasetURN into URI + * Note that here the URN can possibly being specified with pattern, i.e. with wildcards like `*` + * It will be resolved by configStore. */ private URI datasetURNtoURI(String datasetURN) { try { @@ -309,5 +307,7 @@ public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder { } protected abstract Callable<Void> findDatasetsCallable(final ConfigClient confClient, - final URI u, final Properties p, final Collection<Dataset> datasets); + final URI u, final Properties p, Optional<List<String>> blacklistPatterns, + final Collection<Dataset> datasets); + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java index 2461242..50fb2a41 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/copy/replication/ConfigBasedMultiDatasets.java @@ -17,6 +17,7 @@ package gobblin.data.management.copy.replication; +import avro.shaded.com.google.common.annotations.VisibleForTesting; import gobblin.dataset.Dataset; import java.io.IOException; import java.net.URI; @@ -24,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -52,16 +54,25 @@ public class ConfigBasedMultiDatasets { private final Properties props; private final List<Dataset> datasets = new ArrayList<>(); + private Optional<List<Pattern>> blacklist = Optional.of(new ArrayList<>()); + /** * if push mode is set in property, only replicate data when * 1. Push mode is set in Config store - * 2. CopyTo cluster in sync with property with {@link #ConfigurationKeys.WRITER_FILE_SYSTEM_URI} + * 2. CopyTo cluster in sync with property with 'writer.fs.uri' */ public static final String REPLICATION_PUSH_MODE = CopyConfiguration.COPY_PREFIX + ".replicationPushMode"; - public ConfigBasedMultiDatasets (Config c, Properties props){ + // Dummy constructor, return empty datasets. + public ConfigBasedMultiDatasets(){ + this.props = new Properties(); + } + + public ConfigBasedMultiDatasets (Config c, Properties props, + Optional<List<String>> blacklistPatterns){ this.props = props; + blacklist = patternListInitHelper(blacklistPatterns); try { FileSystem executionCluster = FileSystem.get(new Configuration()); @@ -86,6 +97,19 @@ public class ConfigBasedMultiDatasets { } } + private Optional<List<Pattern>> patternListInitHelper(Optional<List<String>> patterns){ + if (patterns.isPresent() && patterns.get().size() >= 1) { + List<Pattern> tmpPatterns = new ArrayList<>(); + for (String pattern : patterns.get()){ + tmpPatterns.add(Pattern.compile(pattern)); + } + return Optional.of(tmpPatterns); + } + else{ + return Optional.absent(); + } + } + private void generateDatasetInPushMode(ReplicationConfiguration rc, URI executionClusterURI){ if(rc.getCopyMode()== ReplicationCopyMode.PULL){ log.info("Skip process pull mode dataset with meta data{} as job level property specify push mode ", rc.getMetaData()); @@ -116,7 +140,15 @@ public class ConfigBasedMultiDatasets { HadoopFsEndPoint ep = (HadoopFsEndPoint)cr.getCopyTo(); if(ep.getFsURI().toString().equals(pushModeTargetCluster)){ - this.datasets.add(new ConfigBasedDataset(rc, this.props, cr)); + // For a candidate dataset, iterate thru. all available blacklist patterns. + ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, cr); + if (blacklistFilteringHelper(configBasedDataset, this.blacklist)){ + this.datasets.add(configBasedDataset); + } + else{ + log.info("Dataset" + configBasedDataset.datasetURN() + " has been filtered out because of blacklist pattern:" + + this.blacklist.get().toString()); + } } } }// inner for loops ends @@ -138,12 +170,40 @@ public class ConfigBasedMultiDatasets { if(needGenerateCopyEntity(replica, executionClusterURI)){ Optional<CopyRoute> copyRoute = cpGen.getPullRoute(rc, replica); if(copyRoute.isPresent()){ - this.datasets.add(new ConfigBasedDataset(rc, this.props, copyRoute.get())); + ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, this.props, copyRoute.get()); + if (blacklistFilteringHelper(configBasedDataset, this.blacklist)){ + this.datasets.add(configBasedDataset); + } + else{ + log.info("Dataset" + configBasedDataset.datasetURN() + " has been filtered out because of blacklist pattern:" + + this.blacklist.get().toString()); + } } } } } + @VisibleForTesting + /** + * Return false if the target configBasedDataset should be kept in the blacklist. + */ + public boolean blacklistFilteringHelper(ConfigBasedDataset configBasedDataset, Optional<List<Pattern>> patternList){ + String datasetURN = configBasedDataset.datasetURN(); + if (patternList.isPresent()) { + for(Pattern pattern: patternList.get()) { + if (pattern.matcher(datasetURN).find()){ + return false; + } + } + // If the dataset get thru. all blacklist check, accept it. + return true; + } + // If blacklist not specified, automatically accept the dataset. + else { + return true; + } + } + public List<Dataset> getConfigBasedDatasetList(){ return this.datasets; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/main/java/gobblin/data/management/retention/profile/ConfigBasedCleanabledDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/gobblin/data/management/retention/profile/ConfigBasedCleanabledDatasetFinder.java b/gobblin-data-management/src/main/java/gobblin/data/management/retention/profile/ConfigBasedCleanabledDatasetFinder.java index e008e26..7dcf0a1 100644 --- a/gobblin-data-management/src/main/java/gobblin/data/management/retention/profile/ConfigBasedCleanabledDatasetFinder.java +++ b/gobblin-data-management/src/main/java/gobblin/data/management/retention/profile/ConfigBasedCleanabledDatasetFinder.java @@ -16,9 +16,11 @@ */ package gobblin.data.management.retention.profile; +import com.google.common.base.Optional; import java.io.IOException; import java.net.URI; import java.util.Collection; +import java.util.List; import java.util.Properties; import java.util.concurrent.Callable; @@ -52,7 +54,7 @@ public class ConfigBasedCleanabledDatasetFinder extends ConfigBasedDatasetsFinde } protected Callable<Void> findDatasetsCallable(final ConfigClient confClient, - final URI u, final Properties p, final Collection<Dataset> datasets) { + final URI u, final Properties p, Optional<List<String>> blacklistURNs, final Collection<Dataset> datasets) { return new Callable<Void>() { @Override public Void call() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d9d7d5f0/gobblin-data-management/src/test/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinderTest.java b/gobblin-data-management/src/test/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinderTest.java index c42788d..634e34a 100644 --- a/gobblin-data-management/src/test/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinderTest.java +++ b/gobblin-data-management/src/test/java/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinderTest.java @@ -17,15 +17,20 @@ package gobblin.data.management.copy.replication; +import com.google.common.base.Optional; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.List; +import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -64,11 +69,40 @@ public class ConfigBasedDatasetsFinderTest { Set<URI> disabled = new HashSet<URI>(); disabled.add(new URI("/data/derived/gowl/pymk/invitationsCreationsSends/hourly_data/aggregation/daily")); - Set<URI> validURIs = ConfigBasedDatasetsFinder.getValidDatasetURIs(allDatasetURIs, disabled, new Path("/data/derived")); + Set<URI> validURIs = ConfigBasedDatasetsFinder.getValidDatasetURIsHelper(allDatasetURIs, disabled, new Path("/data/derived")); Assert.assertTrue(validURIs.size() == 3); Assert.assertTrue(validURIs.contains(new URI("/data/derived/gowl/pymk/invitationsCreationsSends/hourly_data/aggregation/daily_dedup"))); Assert.assertTrue(validURIs.contains(new URI("/data/derived/browsemaps/entities/comp"))); Assert.assertTrue(validURIs.contains(new URI("/data/derived/browsemaps/entities/anet"))); } + + @Test + public void blacklistPatternTest() { + Properties properties = new Properties(); + properties.setProperty("gobblin.selected.policy", "random"); + properties.setProperty("source","random"); + properties.setProperty("replicas", "random"); + + ConfigBasedMultiDatasets configBasedMultiDatasets = new ConfigBasedMultiDatasets(); + + ReplicationConfiguration rc = Mockito.mock(ReplicationConfiguration.class); + CopyRoute cr = Mockito.mock(CopyRoute.class); + ConfigBasedDataset configBasedDataset = new ConfigBasedDataset(rc, new Properties(), cr, "/test/tmp/word"); + ConfigBasedDataset configBasedDataset2 = new ConfigBasedDataset(rc, new Properties(), cr, "/test/a_temporary/word"); + ConfigBasedDataset configBasedDataset3 = new ConfigBasedDataset(rc, new Properties(), cr, "/test/go/word"); + + + Pattern pattern1 = Pattern.compile(".*_temporary.*"); + Pattern pattern2 = Pattern.compile(".*tmp.*"); + List<Pattern> patternList = new ArrayList<>(); + patternList.add(pattern1); + patternList.add(pattern2); + + Assert.assertFalse(configBasedMultiDatasets.blacklistFilteringHelper(configBasedDataset, Optional.of(patternList))); + Assert.assertFalse(configBasedMultiDatasets.blacklistFilteringHelper(configBasedDataset2, Optional.of(patternList))); + Assert.assertTrue(configBasedMultiDatasets.blacklistFilteringHelper(configBasedDataset3, Optional.of(patternList))); + + + } }
