This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7c8127c7aa [GOBBLIN-2141] Simulate directly on trash class (#4036)
7c8127c7aa is described below
commit 7c8127c7aa336f25ea2292346cede3756e3098f6
Author: William Lo <[email protected]>
AuthorDate: Fri Aug 23 14:09:08 2024 -0400
[GOBBLIN-2141] Simulate directly on trash class (#4036)
* Support Trash simulate mode without using MockTrash
* Use Trashfactory initialization on simulated trash classes
---
.../gobblin/data/management/trash/Trash.java | 52 ++++++++++----
.../data/management/trash/TrashFactory.java | 7 +-
.../gobblin/data/management/trash/TrashTest.java | 79 +++++++++++++++++++++-
3 files changed, 121 insertions(+), 17 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/Trash.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/Trash.java
index 3ac54b5d2e..48c85b6393 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/Trash.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/Trash.java
@@ -24,7 +24,6 @@ import java.util.Comparator;
import java.util.List;
import java.util.Properties;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -52,6 +53,8 @@ public class Trash implements GobblinTrash {
private static final FsPermission ALL_PERM = new FsPermission(FsAction.ALL,
FsAction.ALL, FsAction.ALL);
public static final String TRASH_CLASS_KEY = "trash.class";
+ private final boolean simulate;
+
/**
* Location of trash directory in file system. The location can include a
token $USER that will be automatically
* replaced by the name of the active user.
@@ -106,7 +109,11 @@ public class Trash implements GobblinTrash {
throw new IllegalArgumentException("Trash location must be absolute.
Found " + trashLocation.toString());
}
Path qualifiedTrashLocation = fs.makeQualified(trashLocation);
- ensureTrashLocationExists(fs, qualifiedTrashLocation);
+ if (this.simulate) {
+ LOG.info("Simulating trash location creation at " +
qualifiedTrashLocation);
+ } else {
+ ensureTrashLocationExists(fs, qualifiedTrashLocation);
+ }
return qualifiedTrashLocation;
}
@@ -156,6 +163,7 @@ public class Trash implements GobblinTrash {
protected Trash(FileSystem fs, Properties props, String user) throws
IOException {
this.fs = fs;
+ this.simulate = PropertiesUtils.getPropAsBoolean(props,
TrashFactory.SIMULATE, "false");
this.trashLocation = createTrashLocation(fs, props, user);
try {
Class<?> snapshotCleanupPolicyClass =
Class.forName(props.getProperty(SNAPSHOT_CLEANUP_POLICY_CLASS_KEY,
@@ -189,11 +197,18 @@ public class Trash implements GobblinTrash {
Path targetPathInTrash = PathUtils.mergePaths(this.trashLocation,
fullyResolvedPath);
if (!this.fs.exists(targetPathInTrash.getParent())) {
- this.fs.mkdirs(targetPathInTrash.getParent());
+ if (this.simulate) {
+ LOG.info("Making a parent directory at " +
targetPathInTrash.getParent() + " in trash.");
+ } else {
+ this.fs.mkdirs(targetPathInTrash.getParent());
+ }
} else if (this.fs.exists(targetPathInTrash)) {
targetPathInTrash = targetPathInTrash.suffix("_" +
System.currentTimeMillis());
}
-
+ if (this.simulate) {
+ LOG.info("Simulating moving " + fullyResolvedPath + " to " +
targetPathInTrash + " in trash.");
+ return true;
+ }
return this.fs.rename(fullyResolvedPath, targetPathInTrash);
}
@@ -211,15 +226,16 @@ public class Trash implements GobblinTrash {
Path snapshotDir = new Path(this.trashLocation, new
DateTime().toString(TRASH_SNAPSHOT_NAME_FORMATTER));
if (this.fs.exists(snapshotDir)) {
- throw new IOException("New snapshot directory " + snapshotDir.toString()
+ " already exists.");
+ throw new IOException("New snapshot directory " + snapshotDir + "
already exists.");
}
-
- if (!safeFsMkdir(fs, snapshotDir, PERM)) {
- throw new IOException("Failed to create new snapshot directory at " +
snapshotDir.toString());
+ if (this.simulate) {
+ LOG.info("Simulating creation of new snapshot directory at " +
snapshotDir);
+ } else if (!safeFsMkdir(fs, snapshotDir, PERM)) {
+ throw new IOException("Failed to create new snapshot directory at " +
snapshotDir);
}
LOG.info(String.format("Moving %d paths in Trash directory to newly
created snapshot at %s.", pathsInTrash.length,
- snapshotDir.toString()));
+ snapshotDir));
int pathsFailedToMove = 0;
for (FileStatus fileStatus : pathsInTrash) {
@@ -227,7 +243,11 @@ public class Trash implements GobblinTrash {
Path targetPath = new Path(snapshotDir, pathRelativeToTrash);
boolean movedThisPath = true;
try {
- movedThisPath = this.fs.rename(fileStatus.getPath(), targetPath);
+ if (this.simulate) {
+ LOG.info("Simulating moving of " + fileStatus.getPath() + " to " +
targetPath + " in snapshot.");
+ } else {
+ movedThisPath = this.fs.rename(fileStatus.getPath(), targetPath);
+ }
} catch (IOException exception) {
LOG.error("Failed to move path " + fileStatus.getPath().toString() + "
to snapshot.", exception);
pathsFailedToMove += 1;
@@ -275,11 +295,15 @@ public class Trash implements GobblinTrash {
for (FileStatus snapshot : snapshotsInTrash) {
if (this.snapshotCleanupPolicy.shouldDeleteSnapshot(snapshot, this)) {
try {
- boolean successfullyDeleted = this.fs.delete(snapshot.getPath(),
true);
- if (successfullyDeleted) {
- snapshotsDeleted++;
+ if (this.simulate) {
+ LOG.info("Simulating delete of snapshot " + snapshot.getPath());
} else {
- LOG.error("Failed to delete snapshot " + snapshot.getPath());
+ boolean successfullyDeleted = this.fs.delete(snapshot.getPath(),
true);
+ if (successfullyDeleted) {
+ snapshotsDeleted++;
+ } else {
+ LOG.error("Failed to delete snapshot " + snapshot.getPath());
+ }
}
} catch (IOException exception) {
LOG.error("Failed to delete snapshot " + snapshot.getPath(),
exception);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/TrashFactory.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/TrashFactory.java
index faad1b4172..0ae2e42c4a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/TrashFactory.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/trash/TrashFactory.java
@@ -41,6 +41,9 @@ public class TrashFactory {
public static final String SIMULATE = "gobblin.trash.simulate";
public static final String SKIP_TRASH = "gobblin.trash.skip.trash";
+ // Configuration to avoid using MockTrash - as Trash implementations get
more complex it's better to have the Trash class itself support simulate
+ public static final String SIMULATE_USING_ACTUAL_TRASH =
"gobblin.trash.simulate.actual.trash";
+
public static Trash createTrash(FileSystem fs) throws IOException {
return createTrash(fs, new Properties());
}
@@ -107,8 +110,8 @@ public class TrashFactory {
LOG.info("Creating a test trash. Nothing will actually be deleted.");
return new TestTrash(fs, props, user);
}
- if(props.containsKey(SIMULATE) &&
Boolean.parseBoolean(props.getProperty(SIMULATE))) {
- LOG.info("Creating a simulate trash. Nothing will actually be deleted.");
+ if(props.containsKey(SIMULATE) &&
Boolean.parseBoolean(props.getProperty(SIMULATE)) &&
!Boolean.parseBoolean(props.getProperty(SIMULATE_USING_ACTUAL_TRASH)) ) {
+ LOG.info("Creating a mock trash. Nothing will actually be deleted.");
return new MockTrash(fs, props, user);
}
if(props.containsKey(SKIP_TRASH) &&
Boolean.parseBoolean(props.getProperty(SKIP_TRASH))) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/trash/TrashTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/trash/TrashTest.java
index be2691cef0..3372533cae 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/trash/TrashTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/trash/TrashTest.java
@@ -17,13 +17,13 @@
package org.apache.gobblin.data.management.trash;
-
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.permission.FsAction;
@@ -241,4 +241,81 @@ public class TrashTest {
}
}
+ @Test
+ public void testMoveToTrashSimulate() throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(Trash.SNAPSHOT_CLEANUP_POLICY_CLASS_KEY,
TestCleanupPolicy.class.getCanonicalName());
+ properties.setProperty(TrashFactory.SIMULATE, "true");
+ properties.setProperty(TrashFactory.SIMULATE_USING_ACTUAL_TRASH, "true");
+ properties.setProperty(Trash.TRASH_LOCATION_KEY, "/trash/dir");
+ FileSystem mockTrash = mock(FileSystem.class);
+ when(mockTrash.makeQualified(any(Path.class))).thenReturn(new
Path("/trash/dir"));
+ Trash trash = TrashFactory.createTrash(mockTrash, properties);
+
+ Path pathToDelete = new Path("/path/to/delete");
+
+ final List<Pair<Path, Path>> movedPaths = Lists.newArrayList();
+
+ when(trash.fs.rename(any(Path.class), any(Path.class))).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation)
+ throws Throwable {
+ Object[] args = invocation.getArguments();
+ movedPaths.add(new Pair<Path, Path>((Path) args[0], (Path) args[1]));
+ return true;
+ }
+ });
+
+ Assert.assertTrue(trash.moveToTrash(pathToDelete));
+
+ verify(trash.fs, times(0)).mkdirs(any(Path.class));
+
+ Assert.assertEquals(movedPaths.size(), 0);
+
+ }
+
+ @Test
+ public void testPurgeSnapshotsWithSimulate() throws IOException {
+
+ try {
+ Properties properties = new Properties();
+ properties.setProperty(Trash.SNAPSHOT_CLEANUP_POLICY_CLASS_KEY,
TestCleanupPolicy.class.getCanonicalName());
+ properties.setProperty(TrashFactory.SIMULATE, "true");
+ properties.setProperty(TrashFactory.SIMULATE_USING_ACTUAL_TRASH, "true");
+ properties.setProperty(Trash.TRASH_LOCATION_KEY, "/trash/dir");
+ FileSystem mockTrash = mock(FileSystem.class);
+ when(mockTrash.makeQualified(any(Path.class))).thenReturn(new
Path("/trash/dir"));
+ Trash trash = TrashFactory.createTrash(mockTrash, properties);
+
+ DateTimeUtils.setCurrentMillisFixed(new DateTime(2015, 7, 15, 10,
0).withZone(DateTimeZone.UTC).getMillis());
+
+ final List<Path> deletedPaths = Lists.newArrayList();
+
+ Path snapshot1 = new Path(trash.getTrashLocation(),
Trash.TRASH_SNAPSHOT_NAME_FORMATTER.print(new DateTime()));
+ Path snapshot2 = new Path(trash.getTrashLocation(),
+ Trash.TRASH_SNAPSHOT_NAME_FORMATTER.print(new
DateTime().minusDays(1)));
+
+ when(trash.fs.listStatus(any(Path.class), any(PathFilter.class))).
+ thenReturn(
+ Lists.newArrayList(
+ new FileStatus(0, true, 0, 0, 0, snapshot1),
+ new FileStatus(0, true, 0, 0, 0, snapshot2))
+ .toArray(new FileStatus[]{}));
+ when(trash.fs.delete(any(Path.class), anyBoolean())).thenAnswer(new
Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation)
+ throws Throwable {
+ deletedPaths.add((Path) invocation.getArguments()[0]);
+ return true;
+ }
+ });
+
+ trash.purgeTrashSnapshots();
+
+ Assert.assertEquals(deletedPaths.size(), 0);
+ } finally {
+ DateTimeUtils.setCurrentMillisSystem();
+ }
+ }
+
}