[GOBBLIN-381][GOBBLIN-368] Add ability to filter hidden directories for ConfigBasedDatasets
Closes #2260 from sv2000/gobblin-381 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11abf9f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11abf9f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11abf9f5 Branch: refs/heads/0.12.0 Commit: 11abf9f5f604a90376f73c01ed5e4b73b14c35cf Parents: f2f6e46 Author: suvasude <[email protected]> Authored: Wed Jan 24 14:43:56 2018 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Wed Jan 24 14:43:56 2018 -0800 ---------------------------------------------------------------------- .../copy/replication/ConfigBasedDataset.java | 64 ++++---- .../copy/replication/HadoopFsEndPoint.java | 12 +- .../replication/ReplicaHadoopFsEndPoint.java | 53 +++--- .../replication/SourceHadoopFsEndPoint.java | 37 +++-- .../replication/ConfigBasedDatasetTest.java | 162 +++++++++++++++++++ .../replication/ConfigBasedDatasetsTest.java | 135 ---------------- .../configBasedDatasetTest/src/_dir1/file1 | 1 + .../configBasedDatasetTest/src/_dir1/file2 | 1 + 8 files changed, 263 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java index 3881323..293034b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java @@ -29,8 +29,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -71,14 +69,21 @@ public class ConfigBasedDataset implements CopyableDataset { private final ReplicationConfiguration rc; private String datasetURN; private boolean watermarkEnabled; + private final PathFilter pathFilter; + + //Apply filter to directories + private final boolean applyFilterToDirectories; public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute) { this.props = props; this.copyRoute = copyRoute; this.rc = rc; calculateDatasetURN(); - this.watermarkEnabled = Boolean.parseBoolean - (this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true")); + this.watermarkEnabled = + Boolean.parseBoolean(this.props.getProperty(ConfigBasedDatasetsFinder.WATERMARK_ENABLE, "true")); + this.pathFilter = DatasetUtils.instantiatePathFilter(this.props); + this.applyFilterToDirectories = + Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false")); } public ConfigBasedDataset(ReplicationConfiguration rc, Properties props, CopyRoute copyRoute, String datasetURN) { @@ -86,9 +91,12 @@ public class ConfigBasedDataset implements CopyableDataset { this.copyRoute = copyRoute; this.rc = rc; this.datasetURN = datasetURN; + this.pathFilter = DatasetUtils.instantiatePathFilter(this.props); + this.applyFilterToDirectories = + Boolean.parseBoolean(this.props.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false")); } - private void calculateDatasetURN(){ + private void calculateDatasetURN() { EndPoint e = this.copyRoute.getCopyTo(); if (e instanceof HadoopFsEndPoint) { HadoopFsEndPoint copyTo = (HadoopFsEndPoint) e; @@ -120,6 +128,14 @@ public class ConfigBasedDataset implements CopyableDataset { return copyableFiles; } + //For {@link HadoopFsEndPoint}s, set pathfilter and applyFilterToDirectories + HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw; + HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw; + copyFrom.setPathFilter(pathFilter); + copyFrom.setApplyFilterToDirectories(applyFilterToDirectories); + copyTo.setPathFilter(pathFilter); + copyTo.setApplyFilterToDirectories(applyFilterToDirectories); + if (this.watermarkEnabled) { if ((!copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent()) || ( copyFromRaw.getWatermark().isPresent() && copyToRaw.getWatermark().isPresent() @@ -132,8 +148,6 @@ public class ConfigBasedDataset implements CopyableDataset { } } - HadoopFsEndPoint copyFrom = (HadoopFsEndPoint) copyFromRaw; - HadoopFsEndPoint copyTo = (HadoopFsEndPoint) copyToRaw; Configuration conf = HadoopUtils.newConfiguration(); FileSystem copyFromFs = FileSystem.get(copyFrom.getFsURI(), conf); FileSystem copyToFs = FileSystem.get(copyTo.getFsURI(), conf); @@ -141,20 +155,10 @@ public class ConfigBasedDataset implements CopyableDataset { Collection<FileStatus> allFilesInSource = copyFrom.getFiles(); Collection<FileStatus> allFilesInTarget = copyTo.getFiles(); - final PathFilter pathFilter = DatasetUtils.instantiatePathFilter(this.props); - Predicate<FileStatus> predicate = new Predicate<FileStatus>() { - @Override - public boolean apply(FileStatus input) { - return pathFilter.accept(input.getPath()); - } - }; - - Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(Collections2.filter(allFilesInSource, predicate)); + Set<FileStatus> copyFromFileStatuses = Sets.newHashSet(allFilesInSource); Map<Path, FileStatus> copyToFileMap = Maps.newHashMap(); - for(FileStatus f: allFilesInTarget){ - if(pathFilter.accept(f.getPath())){ - copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f); - } + for (FileStatus f : allFilesInTarget) { + copyToFileMap.put(PathUtils.getPathWithoutSchemeAndAuthority(f.getPath()), f); } Collection<Path> deletedPaths = Lists.newArrayList(); @@ -184,10 +188,11 @@ public class ConfigBasedDataset implements CopyableDataset { deletedPaths.add(newPath); } - copyableFiles - .add(CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration) - .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build()); - + copyableFiles.add( + CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), + copyConfiguration) + .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()) + .build()); } // clean up already checked paths @@ -202,18 +207,17 @@ public class ConfigBasedDataset implements CopyableDataset { // delete old files first if (!deletedPaths.isEmpty()) { DeleteFileCommitStep deleteCommitStep = DeleteFileCommitStep.fromPaths(copyToFs, deletedPaths, this.props); - copyableFiles.add(new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(), - deleteCommitStep, 0)); + copyableFiles.add( + new PrePublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(), deleteCommitStep, + 0)); } // generate the watermark file even if watermark checking is disabled. Make sure it can come into functional once disired. if ((!watermarkMetadataCopied) && copyFrom.getWatermark().isPresent()) { - copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String> newHashMap(), + copyableFiles.add(new PostPublishStep(copyTo.getDatasetPath().toString(), Maps.<String, String>newHashMap(), new WatermarkMetadataGenerationCommitStep(copyTo.getFsURI().toString(), copyTo.getDatasetPath(), - copyFrom.getWatermark().get()), - 1)); + copyFrom.getWatermark().get()), 1)); } return copyableFiles; } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java index 97fd20e..ea93ea8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/HadoopFsEndPoint.java @@ -20,6 +20,8 @@ package org.apache.gobblin.data.management.copy.replication; import java.io.IOException; import java.net.URI; +import lombok.Getter; +import lombok.Setter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -28,9 +30,15 @@ import com.typesafe.config.Config; import org.apache.gobblin.util.HadoopUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.fs.PathFilter; + @Slf4j -public abstract class HadoopFsEndPoint implements EndPoint{ +@Getter +@Setter +public abstract class HadoopFsEndPoint implements EndPoint { + private PathFilter pathFilter; + private boolean applyFilterToDirectories; /** * @@ -56,7 +64,7 @@ public abstract class HadoopFsEndPoint implements EndPoint{ * @param path The path to be checked. For fs availability checking, just use "/" * @return If the filesystem/path exists or not. */ - public boolean isPathAvailable(Path path){ + public boolean isPathAvailable(Path path) { try { Configuration conf = HadoopUtils.newConfiguration(); FileSystem fs = FileSystem.get(this.getFsURI(), conf); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java index 8ab4e7e..024b239 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ReplicaHadoopFsEndPoint.java @@ -72,30 +72,31 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { } @Override - public synchronized Collection<FileStatus> getFiles() throws IOException{ - if(filesInitialized){ + public synchronized Collection<FileStatus> getFiles() throws IOException { + if (filesInitialized) { return this.allFileStatus; } this.filesInitialized = true; FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration()); - if(!fs.exists(this.rc.getPath())){ + if (!fs.exists(this.rc.getPath())) { return Collections.emptyList(); } Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this); - //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc); + //ReplicationDataValidPathPicker.getValidPaths(fs, this.rc.getPath(), this.rdc); - for(Path p: validPaths){ - this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p)); + for (Path p : validPaths) { + this.allFileStatus.addAll( + FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories())); } return this.allFileStatus; } @Override public synchronized Optional<ComparableWatermark> getWatermark() { - if(this.watermarkInitialized) { + if (this.watermarkInitialized) { return this.cachedWatermark; } @@ -104,12 +105,12 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { Path metaData = new Path(rc.getPath(), WATERMARK_FILE); FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration()); if (fs.exists(metaData)) { - try(FSDataInputStream fin = fs.open(metaData)){ + try (FSDataInputStream fin = fs.open(metaData)) { InputStreamReader reader = new InputStreamReader(fin, Charsets.UTF_8); String content = CharStreams.toString(reader); Watermark w = WatermarkMetadataUtil.deserialize(content); - if(w instanceof ComparableWatermark){ - this.cachedWatermark = Optional.of((ComparableWatermark)w); + if (w instanceof ComparableWatermark) { + this.cachedWatermark = Optional.of((ComparableWatermark) w); } } return this.cachedWatermark; @@ -120,7 +121,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { } catch (IOException e) { log.warn("Can not find " + WATERMARK_FILE + " for replica " + this); return this.cachedWatermark; - } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e){ + } catch (WatermarkMetadataUtil.WatermarkMetadataMulFormatException e) { log.warn("Can not create watermark from " + WATERMARK_FILE + " for replica " + this); return this.cachedWatermark; } @@ -143,8 +144,11 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { @Override public String toString() { - return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName()) - .add("hadoopfs config", this.rc).toString(); + return Objects.toStringHelper(this.getClass()) + .add("is source", this.isSource()) + .add("end point name", this.getEndPointName()) + .add("hadoopfs config", this.rc) + .toString(); } @Override @@ -153,7 +157,7 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { } @Override - public Path getDatasetPath(){ + public Path getDatasetPath() { return this.rc.getPath(); } @@ -168,23 +172,30 @@ public class ReplicaHadoopFsEndPoint extends HadoopFsEndPoint { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } ReplicaHadoopFsEndPoint other = (ReplicaHadoopFsEndPoint) obj; if (rc == null) { - if (other.rc != null) + if (other.rc != null) { return false; - } else if (!rc.equals(other.rc)) + } + } else if (!rc.equals(other.rc)) { return false; + } if (replicaName == null) { - if (other.replicaName != null) + if (other.replicaName != null) { return false; - } else if (!replicaName.equals(other.replicaName)) + } + } else if (!replicaName.equals(other.replicaName)) { return false; + } return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java index 0769c5c..2a56f2e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/SourceHadoopFsEndPoint.java @@ -39,7 +39,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ +public class SourceHadoopFsEndPoint extends HadoopFsEndPoint { @Getter private final HadoopFsReplicaConfig rc; @@ -57,8 +57,8 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ } @Override - public synchronized Collection<FileStatus> getFiles() throws IOException{ - if(!this.initialized){ + public synchronized Collection<FileStatus> getFiles() throws IOException { + if (!this.initialized) { this.getWatermark(); } return this.allFileStatus; @@ -66,7 +66,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ @Override public synchronized Optional<ComparableWatermark> getWatermark() { - if(this.initialized) { + if (this.initialized) { return this.cachedWatermark; } try { @@ -74,8 +74,9 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ FileSystem fs = FileSystem.get(rc.getFsURI(), new Configuration()); Collection<Path> validPaths = ReplicationDataValidPathPicker.getValidPaths(this); - for(Path p: validPaths){ - this.allFileStatus.addAll(FileListUtils.listFilesRecursively(fs, p)); + for (Path p : validPaths) { + this.allFileStatus.addAll( + FileListUtils.listFilesRecursively(fs, p, super.getPathFilter(), super.isApplyFilterToDirectories())); } for (FileStatus f : this.allFileStatus) { @@ -115,8 +116,11 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ @Override public String toString() { - return Objects.toStringHelper(this.getClass()).add("is source", this.isSource()).add("end point name", this.getEndPointName()) - .add("hadoopfs config", this.rc).toString(); + return Objects.toStringHelper(this.getClass()) + .add("is source", this.isSource()) + .add("end point name", this.getEndPointName()) + .add("hadoopfs config", this.rc) + .toString(); } @Override @@ -125,7 +129,7 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ } @Override - public Path getDatasetPath(){ + public Path getDatasetPath() { return this.rc.getPath(); } @@ -139,18 +143,23 @@ public class SourceHadoopFsEndPoint extends HadoopFsEndPoint{ @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null) + } + if (obj == null) { return false; - if (getClass() != obj.getClass()) + } + if (getClass() != obj.getClass()) { return false; + } SourceHadoopFsEndPoint other = (SourceHadoopFsEndPoint) obj; if (rc == null) { - if (other.rc != null) + if (other.rc != null) { return false; - } else if (!rc.equals(other.rc)) + } + } else if (!rc.equals(other.rc)) { return false; + } return true; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java new file mode 100644 index 0000000..f925243 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.replication; + +import java.net.URI; +import java.util.Collection; +import java.util.Properties; +import java.util.Set; + +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.data.management.dataset.DatasetUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyEntity; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.PreserveAttributes; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; +import org.apache.gobblin.data.management.copy.entities.PrePublishStep; +import org.apache.gobblin.source.extractor.ComparableWatermark; +import org.apache.gobblin.source.extractor.extract.LongWatermark; +import org.apache.gobblin.util.FileListUtils; +import org.apache.gobblin.util.PathUtils; +import org.apache.gobblin.util.commit.DeleteFileCommitStep; + + +/** + * Unit test for {@link ConfigBasedDataset} + * @author mitu + * + */ +@Test(groups = {"gobblin.data.management.copy.replication"}) +@Slf4j +public class ConfigBasedDatasetTest { + + public Collection<? extends CopyEntity> testGetCopyableFilesHelper(String sourceDir, String destinationDir, + long sourceWatermark, boolean isFilterEnabled) throws Exception { + FileSystem localFs = FileSystem.getLocal(new Configuration()); + URI local = localFs.getUri(); + + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher"); + PathFilter pathFilter = DatasetUtils.instantiatePathFilter(properties); + boolean applyFilterToDirectories = false; + if (isFilterEnabled) { + properties.setProperty(DatasetUtils.CONFIGURATION_KEY_PREFIX + "path.filter.class", + "org.apache.gobblin.util.filters.HiddenFilter"); + properties.setProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "true"); + + pathFilter = DatasetUtils.instantiatePathFilter(properties); + applyFilterToDirectories = + Boolean.parseBoolean(properties.getProperty(CopyConfiguration.APPLY_FILTER_TO_DIRECTORIES, "false")); + } + + CopyConfiguration copyConfiguration = + CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties) + .publishDir(new Path(destinationDir)) + .preserve(PreserveAttributes.fromMnemonicString("ugp")) + .build(); + + ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class); + Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data"); + + ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class); + Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL); + Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData); + + HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class); + Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir)); + Mockito.when(copyFrom.getFsURI()).thenReturn(local); + ComparableWatermark sw = new LongWatermark(sourceWatermark); + Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw)); + Mockito.when(copyFrom.getFiles()) + .thenReturn( + FileListUtils.listFilesRecursively(localFs, new Path(sourceDir), pathFilter, applyFilterToDirectories)); + + HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class); + Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir)); + Mockito.when(copyTo.getFsURI()).thenReturn(local); + Optional<ComparableWatermark> tmp = Optional.absent(); + Mockito.when(copyTo.getWatermark()).thenReturn(tmp); + Mockito.when(copyTo.getFiles()) + .thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir), pathFilter, + applyFilterToDirectories)); + + CopyRoute route = Mockito.mock(CopyRoute.class); + Mockito.when(route.getCopyFrom()).thenReturn(copyFrom); + Mockito.when(route.getCopyTo()).thenReturn(copyTo); + + ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route); + Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration); + return copyableFiles; + } + + @Test + public void testGetCopyableFiles() throws Exception { + String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile(); + String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile(); + long sourceWatermark = 100L; + + Collection<? extends CopyEntity> copyableFiles = + testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, false); + Assert.assertEquals(copyableFiles.size(), 8); + copyableFiles = testGetCopyableFilesHelper(sourceDir, destinationDir, sourceWatermark, true); + Assert.assertEquals(copyableFiles.size(), 6); + + Set<Path> paths = + Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3")); + for (CopyEntity copyEntity : copyableFiles) { + if (copyEntity instanceof CopyableFile) { + CopyableFile file = (CopyableFile) copyEntity; + Path originRelativePath = + PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()), + PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir))); + Path targetRelativePath = + PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()), + PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir))); + + Assert.assertTrue(paths.contains(originRelativePath)); + Assert.assertTrue(paths.contains(targetRelativePath)); + Assert.assertEquals(originRelativePath, targetRelativePath); + } else if (copyEntity instanceof PrePublishStep) { + PrePublishStep pre = (PrePublishStep) copyEntity; + Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep); + // need to delete this file + Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0); + } else if (copyEntity instanceof PostPublishStep) { + PostPublishStep post = (PostPublishStep) copyEntity; + Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep); + Assert.assertTrue( + post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf("" + sourceWatermark) > 0); + } else { + throw new Exception("Wrong type"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java deleted file mode 100644 index 1965a41..0000000 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.data.management.copy.replication; - -import java.net.URI; -import java.util.Collection; -import java.util.Properties; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.Test; - -import com.google.common.base.Optional; -import com.google.common.collect.Sets; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.data.management.copy.CopyConfiguration; -import org.apache.gobblin.data.management.copy.CopyEntity; -import org.apache.gobblin.data.management.copy.CopyableFile; -import org.apache.gobblin.data.management.copy.PreserveAttributes; -import org.apache.gobblin.data.management.copy.entities.PostPublishStep; -import org.apache.gobblin.data.management.copy.entities.PrePublishStep; -import org.apache.gobblin.source.extractor.ComparableWatermark; -import org.apache.gobblin.source.extractor.extract.LongWatermark; -import org.apache.gobblin.util.FileListUtils; -import org.apache.gobblin.util.PathUtils; -import org.apache.gobblin.util.commit.DeleteFileCommitStep; - - -/** - * Unit test for {@link ConfigBasedDatasets} - * @author mitu - * - */ -@Test(groups = {"gobblin.data.management.copy.replication"}) - -public class ConfigBasedDatasetsTest { - - @Test - public void testGetCopyableFiles() throws Exception { - String sourceDir = getClass().getClassLoader().getResource("configBasedDatasetTest/src").getFile(); - String destinationDir = getClass().getClassLoader().getResource("configBasedDatasetTest/dest").getFile(); - FileSystem localFs = FileSystem.getLocal(new Configuration()); - URI local = localFs.getUri(); - long sourceWatermark = 100L; - - Properties properties = new Properties(); - properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/publisher"); - - CopyConfiguration copyConfiguration = - CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), properties).publishDir(new Path(destinationDir)) - .preserve(PreserveAttributes.fromMnemonicString("ugp")).build(); - - ReplicationMetaData mockMetaData = Mockito.mock(ReplicationMetaData.class); - Mockito.when(mockMetaData.toString()).thenReturn("Mock Meta Data"); - - ReplicationConfiguration mockRC = Mockito.mock(ReplicationConfiguration.class); - Mockito.when(mockRC.getCopyMode()).thenReturn(ReplicationCopyMode.PULL); - Mockito.when(mockRC.getMetaData()).thenReturn(mockMetaData); - - HadoopFsEndPoint copyFrom = Mockito.mock(HadoopFsEndPoint.class); - Mockito.when(copyFrom.getDatasetPath()).thenReturn(new Path(sourceDir)); - Mockito.when(copyFrom.getFsURI()).thenReturn(local); - ComparableWatermark sw = new LongWatermark(sourceWatermark); - Mockito.when(copyFrom.getWatermark()).thenReturn(Optional.of(sw)); - Mockito.when(copyFrom.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(sourceDir))); - - HadoopFsEndPoint copyTo = Mockito.mock(HadoopFsEndPoint.class); - Mockito.when(copyTo.getDatasetPath()).thenReturn(new Path(destinationDir)); - Mockito.when(copyTo.getFsURI()).thenReturn(local); - Optional<ComparableWatermark>tmp = Optional.absent(); - Mockito.when(copyTo.getWatermark()).thenReturn(tmp); - Mockito.when(copyTo.getFiles()).thenReturn(FileListUtils.listFilesRecursively(localFs, new Path(destinationDir))); - - CopyRoute route = Mockito.mock(CopyRoute.class); - Mockito.when(route.getCopyFrom()).thenReturn(copyFrom); - Mockito.when(route.getCopyTo()).thenReturn(copyTo); - - ConfigBasedDataset dataset = new ConfigBasedDataset(mockRC, properties, route); - - Collection<? extends CopyEntity> copyableFiles = dataset.getCopyableFiles(localFs, copyConfiguration); - Assert.assertEquals(copyableFiles.size(), 6); - - Set<Path> paths = Sets.newHashSet(new Path("dir1/file2"), new Path("dir1/file1"), new Path("dir2/file1"), new Path("dir2/file3")); - for (CopyEntity copyEntity : copyableFiles) { - if(copyEntity instanceof CopyableFile) { - CopyableFile file = (CopyableFile) copyEntity; - Path originRelativePath = - PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getOrigin().getPath()), - PathUtils.getPathWithoutSchemeAndAuthority(new Path(sourceDir))); - Path targetRelativePath = - PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(file.getDestination()), - PathUtils.getPathWithoutSchemeAndAuthority(new Path(destinationDir))); - - Assert.assertTrue(paths.contains(originRelativePath)); - Assert.assertTrue(paths.contains(targetRelativePath)); - Assert.assertEquals(originRelativePath, targetRelativePath); - } - else if(copyEntity instanceof PrePublishStep){ - PrePublishStep pre = (PrePublishStep)copyEntity; - Assert.assertTrue(pre.getStep() instanceof DeleteFileCommitStep); - // need to delete this file - Assert.assertTrue(pre.explain().indexOf("configBasedDatasetTest/dest/dir1/file1") > 0); - } - else if(copyEntity instanceof PostPublishStep){ - PostPublishStep post = (PostPublishStep)copyEntity; - Assert.assertTrue(post.getStep() instanceof WatermarkMetadataGenerationCommitStep); - Assert.assertTrue(post.explain().indexOf("dest/_metadata") > 0 && post.explain().indexOf(""+sourceWatermark)>0); - } - else{ - throw new Exception("Wrong type"); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 new file mode 100644 index 0000000..d87e628 --- /dev/null +++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file1 @@ -0,0 +1 @@ +_dir1:file1content http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11abf9f5/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 new file mode 100644 index 0000000..248eb41 --- /dev/null +++ b/gobblin-data-management/src/test/resources/configBasedDatasetTest/src/_dir1/file2 @@ -0,0 +1 @@ +_dir1:file2content
