[GOBBLIN-397] Create a new dataset version selection policy for filtering dataset versions that have "hidden" paths.
Closes #2271 from sv2000/gobblin-397 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ff13dde1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ff13dde1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ff13dde1 Branch: refs/heads/0.12.0 Commit: ff13dde1c88d21048494cf79fdf2319c488b81c5 Parents: 161bef0 Author: suvasude <[email protected]> Authored: Wed Jan 31 14:00:49 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Jan 31 14:00:49 2018 -0800 ---------------------------------------------------------------------- .../policy/HiddenFilterSelectionPolicy.java | 90 ++++++++++++++++++++ .../policy/HiddenFilterSelectionPolicyTest.java | 77 +++++++++++++++++ 2 files changed, 167 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ff13dde1/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicy.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicy.java new file mode 100644 index 0000000..1c515ae --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicy.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.policy; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.gobblin.data.management.version.FileSystemDatasetVersion; +import org.apache.gobblin.util.ConfigUtils; + + +/* + * Select dataset versions that pass the hidden path filter i.e. accept paths that do not have sub-dirs whose names start with "." or "_". + */ +public class HiddenFilterSelectionPolicy implements VersionSelectionPolicy<FileSystemDatasetVersion> { + public static final String HIDDEN_FILTER_HIDDEN_FILE_PREFIX_KEY = "selection.hiddenFilter.hiddenFilePrefix"; + private static final String[] DEFAULT_HIDDEN_FILE_PREFIXES = {".", "_"}; + private List<String> hiddenFilePrefixes; + + public HiddenFilterSelectionPolicy(Config config) { + if (config.hasPath(HIDDEN_FILTER_HIDDEN_FILE_PREFIX_KEY)) { + this.hiddenFilePrefixes = ConfigUtils.getStringList(config, HIDDEN_FILTER_HIDDEN_FILE_PREFIX_KEY); + } else { + this.hiddenFilePrefixes = Arrays.asList(DEFAULT_HIDDEN_FILE_PREFIXES); + } + } + + @Override + public Class<? extends FileSystemDatasetVersion> versionClass() { + return FileSystemDatasetVersion.class; + } + + private boolean isPathHidden(Path path) { + while (path != null) { + String name = path.getName(); + for (String prefix : this.hiddenFilePrefixes) { + if (name.startsWith(prefix)) { + return true; + } + } + path = path.getParent(); + } + return false; + } + + private Predicate<FileSystemDatasetVersion> getSelectionPredicate() { + return new Predicate<FileSystemDatasetVersion>() { + @Override + public boolean apply(FileSystemDatasetVersion version) { + Set<Path> paths = version.getPaths(); + for (Path path : paths) { + Path p = path.getPathWithoutSchemeAndAuthority(path); + if (isPathHidden(p)) { + return false; + } + } + return true; + } + }; + } + + @Override + public Collection<FileSystemDatasetVersion> listSelectedVersions(List<FileSystemDatasetVersion> allVersions) { + return Lists.newArrayList(Collections2.filter(allVersions, getSelectionPredicate())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ff13dde1/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicyTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicyTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicyTest.java new file mode 100644 index 0000000..5c08b7c --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/policy/HiddenFilterSelectionPolicyTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.policy; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.fs.Path; +import org.joda.time.DateTime; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.data.management.version.FileSystemDatasetVersion; +import org.apache.gobblin.data.management.version.TimestampedDatasetVersion; + + +public class HiddenFilterSelectionPolicyTest { + @Test + public void testListSelectedVersions() throws Exception { + List<FileSystemDatasetVersion> versionList = new ArrayList<>(); + Set<String> pathSet = new HashSet<>(); + Path path1 = new Path("/data/dataset/versions/version1"); + pathSet.add(path1.toString()); + Path path2 = new Path("/data/dataset/versions/version2"); + pathSet.add(path2.toString()); + Path path3 = new Path("/data/dataset/.temp/tmpPath"); + Path path4 = new Path("/data/dataset/_temp/tmpPath"); + + versionList.add(new TimestampedDatasetVersion(new DateTime(), path1)); + versionList.add(new TimestampedDatasetVersion(new DateTime(), path2)); + versionList.add(new TimestampedDatasetVersion(new DateTime(), path3)); + versionList.add(new TimestampedDatasetVersion(new DateTime(), path4)); + + List<String> hiddenFilePrefixes = Arrays.asList("_", "."); + List<Config> configList = new ArrayList<>(); + Config config1 = ConfigFactory.parseMap( + ImmutableMap.of(HiddenFilterSelectionPolicy.HIDDEN_FILTER_HIDDEN_FILE_PREFIX_KEY, hiddenFilePrefixes)); + configList.add(config1); + Config config2 = ConfigFactory.parseMap( + ImmutableMap.of(HiddenFilterSelectionPolicy.HIDDEN_FILTER_HIDDEN_FILE_PREFIX_KEY, "_,.")); + configList.add(config2); + for (Config config : configList) { + HiddenFilterSelectionPolicy policy = new HiddenFilterSelectionPolicy(config); + Collection<FileSystemDatasetVersion> selectedVersions = policy.listSelectedVersions(versionList); + Assert.assertEquals(selectedVersions.size(), 2); + for (FileSystemDatasetVersion version : selectedVersions) { + Set<Path> paths = version.getPaths(); + for (Path path : paths) { + Assert.assertTrue(pathSet.contains(path.toString())); + } + } + } + } +} \ No newline at end of file
