This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fbc11ce21 Correct semantics of `IcebergDatasetTest` and streamline
both impl and test code (#3571)
fbc11ce21 is described below
commit fbc11ce21712c057b28b4f14f63012fa8423e462
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Sep 29 15:08:24 2022 -0700
Correct semantics of `IcebergDatasetTest` and streamline both impl and test
code (#3571)
---
.../management/copy/iceberg/IcebergDataset.java | 66 +++----
.../copy/iceberg/IcebergDatasetTest.java | 220 +++++++++++++--------
2 files changed, 165 insertions(+), 121 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index ae6e1aaea..1886689a1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -27,21 +27,19 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.jetbrains.annotations.NotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import lombok.Data;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
@@ -83,15 +81,6 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties,
TARGET_METASTORE_URI_KEY);
}
- /**
- * Represents a source {@link FileStatus} and a {@link Path} destination.
- */
- @Data
- private static class SourceAndDestination {
- private final FileStatus source;
- private final Path destination;
- }
-
@Override
public String datasetURN() {
return this.getFileSetId();
@@ -135,15 +124,27 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
* table replication.
*/
@VisibleForTesting
- Collection<CopyEntity> generateCopyEntities(FileSystem targetFs,
CopyConfiguration configuration) throws IOException {
+ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs,
CopyConfiguration copyConfig) throws IOException {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
log.info("{}.{} - found {} candidate source paths", dbName,
inputTableName, pathToFileStatus.size());
- for (CopyableFile.Builder builder :
getCopyableFilesFromPaths(pathToFileStatus, configuration, targetFs)) {
- CopyableFile fileEntity =
-
builder.fileSet(fileSet).datasetOutputPath(targetFs.getUri().getPath()).build();
+ Configuration defaultHadoopConfiguration = new Configuration();
+ for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+ Path srcPath = entry.getKey();
+ FileStatus srcFileStatus = entry.getValue();
+ // TODO: determine whether unnecessarily expensive to repeatedly
re-create what should be the same FS: could it
+ // instead be created once and reused thereafter?
+ FileSystem actualSourceFs =
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+ // TODO: Add preservation of ancestor ownership and permissions!
+
+ CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+ actualSourceFs, srcFileStatus, targetFs.makeQualified(srcPath),
copyConfig)
+ .fileSet(fileSet)
+ .datasetOutputPath(targetFs.getUri().getPath())
+ .build();
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
@@ -152,29 +153,6 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return copyEntities;
}
- /**
- * Get builders for a {@link CopyableFile} for each file path
- */
- protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path,
FileStatus> pathToFileStatus, CopyConfiguration configuration, FileSystem
targetFs) throws IOException {
-
- List<CopyableFile.Builder> builders = Lists.newArrayList();
- List<SourceAndDestination> dataFiles = Lists.newArrayList();
- Configuration defaultHadoopConfiguration = new Configuration();
- FileSystem actualSourceFs;
-
- for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
- dataFiles.add(new SourceAndDestination(entry.getValue(),
targetFs.makeQualified(entry.getKey())));
- }
-
- for (SourceAndDestination sourceAndDestination : dataFiles) {
- actualSourceFs =
sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);
-
- // TODO: Add ancestor owner and permissions in future releases
- builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs,
sourceAndDestination.getSource(),
- sourceAndDestination.getDestination(), configuration));
- }
- return builders;
- }
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
@@ -201,6 +179,11 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return result;
}
+ /** Add layer of indirection to permit test mocking by working around
`FileSystem.get()` `static` method */
+ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus
fileStatus, Configuration hadoopConfig) throws IOException {
+ return fileStatus.getPath().getFileSystem(hadoopConfig);
+ }
+
protected static Optional<URI> getAsOptionalURI(Properties props, String
key) {
return Optional.ofNullable(props.getProperty(key)).map(URI::create);
}
@@ -213,7 +196,6 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return getDatasetDescriptor(targetCatalogMetastoreURI, targetFs);
}
- @NotNull
private DatasetDescriptor getDatasetDescriptor(Optional<URI>
catalogMetastoreURI, FileSystem fs) {
DatasetDescriptor descriptor = new DatasetDescriptor(
DatasetConstants.PLATFORM_ICEBERG,
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 0f7fd491d..61aaf6851 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.data.management.copy.iceberg;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -24,15 +25,18 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import lombok.Data;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -42,7 +46,6 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -51,10 +54,25 @@ import
org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyContext;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.testng.collections.Sets;
+import static org.mockito.Matchers.any;
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergDataset} */
public class IcebergDatasetTest {
+ private static final URI SRC_FS_URI;
+ private static final URI DEST_FS_URI;
+ static {
+ try {
+ SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
+ DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("should not occur!", e);
+ }
+ }
+
private static final String ROOT_PATH = "/root/iceberg/test/";
private static final String METADATA_PATH = ROOT_PATH +
"metadata/metadata.json";
private static final String MANIFEST_LIST_PATH_0 = ROOT_PATH +
"metadata/manifest_list.x";
@@ -76,38 +94,33 @@ public class IcebergDatasetTest {
MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A,
MANIFEST_DATA_PATH_1B)))
);
- private final String test_db_name = "test_db_name";
- private final String test_table_name = "test_tbl_name";
- private final String test_qualified_path =
"/root/iceberg/test/destination/sub_path_destination";
- private final String test_uri_path = "/root/iceberg/test/output";
- private final Properties properties = new Properties();
+ private final String testDbName = "test_db_name";
+ private final String testTblName = "test_tbl_name";
+ private final Properties copyConfigProperties = new Properties();
@BeforeClass
public void setUp() throws Exception {
- properties.setProperty("data.publisher.final.dir", "/test");
+ copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
}
@Test
public void testGetFilePaths() throws IOException {
-
- List<String> pathsToCopy = Lists.newArrayList(MANIFEST_DATA_PATH_0A,
MANIFEST_DATA_PATH_0B);
- Map<Path, FileStatus> expected = Maps.newHashMap();
- expected.put(new Path(MANIFEST_DATA_PATH_0A), null);
- expected.put(new Path(MANIFEST_DATA_PATH_0B), null);
-
IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
- FileSystem fs = Mockito.mock(FileSystem.class);
- IcebergSnapshotInfo icebergSnapshotInfo =
Mockito.mock(IcebergSnapshotInfo.class);
-
+ IcebergSnapshotInfo icebergSnapshotInfo =
SNAPSHOT_PATHS_0.asSnapshotInfo();
Mockito.when(icebergTable.getIncrementalSnapshotInfosIterator()).thenReturn(Arrays.asList(icebergSnapshotInfo).iterator());
- Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
- Mockito.when(icebergSnapshotInfo.getSnapshotId()).thenReturn(98765L);
-
Mockito.when(icebergSnapshotInfo.getMetadataPath()).thenReturn(Optional.of("path
for log message"));
- IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), fs);
+ FileSystem sourceFs = Mockito.mock(FileSystem.class);
+ IcebergDataset icebergDataset = new IcebergDataset("test_db_name",
"test_tbl_name", icebergTable, new Properties(), sourceFs);
+
+ Set<Path> expectedPaths = Sets.newHashSet();
+ for (String p : icebergSnapshotInfo.getAllPaths()) {
+ expectedPaths.add(new Path(p));
+ }
- Map<Path, FileStatus> actual = icebergDataset.getFilePathsToFileStatus();
- Assert.assertEquals(actual, expected);
+ Map<Path, FileStatus> filePathsToFileStatus =
icebergDataset.getFilePathsToFileStatus();
+ Assert.assertEquals(filePathsToFileStatus.keySet(), expectedPaths);
+ // verify all values `null` (because `sourceFs.getFileStatus` not mocked)
+ Assert.assertEquals(Sets.newHashSet(filePathsToFileStatus.values()), new
HashSet<>(Arrays.asList(new FileStatus[] { null })));
}
/**
@@ -116,46 +129,50 @@ public class IcebergDatasetTest {
* without calculating any difference between the source and destination
*/
@Test
- public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException,
URISyntaxException {
+ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException {
List<String> expectedPaths = Arrays.asList(METADATA_PATH,
MANIFEST_LIST_PATH_0,
MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
- FileSystem fs = Mockito.mock(FileSystem.class);
+ MockFileSystemBuilder sourceBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
+ sourceBuilder.addPaths(expectedPaths);
+ FileSystem sourceFs = sourceBuilder.build();
+
IcebergTable icebergTable = new
MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
- DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
- destinationFileSystem.addPaths(expectedPaths);
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
- mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus,
test_qualified_path, test_uri_path);
+ MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ FileSystem destFs = destBuilder.build();
- CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs,
copyConfigProperties)
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext())
.build();
- Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(fs, copyConfiguration);
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyCopyEntities(copyEntities, expectedPaths);
}
/** Test generating copy entities for a multi-snapshot iceberg; given empty
dest, src-dest delta will be entirety */
@Test
- public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws
IOException, URISyntaxException {
+ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws
IOException {
List<String> expectedPaths = Arrays.asList(METADATA_PATH,
MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A,
MANIFEST_DATA_PATH_0B,
MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A,
MANIFEST_DATA_PATH_1B);
- FileSystem fs = Mockito.mock(FileSystem.class);
+ MockFileSystemBuilder sourceBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
+ sourceBuilder.addPaths(expectedPaths);
+ FileSystem sourceFs = sourceBuilder.build();
+
IcebergTable icebergTable = new
MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
- DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
- destinationFileSystem.addPaths(expectedPaths);
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
- mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus,
test_qualified_path, test_uri_path);
+ MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ FileSystem destFs = destBuilder.build();
- CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs,
copyConfigProperties)
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext())
.build();
- Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(fs, copyConfiguration);
+ Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyCopyEntities(copyEntities, expectedPaths);
}
@@ -175,18 +192,87 @@ public class IcebergDatasetTest {
Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
}
- private void mockFileSystemMethodCalls(FileSystem fs, Map<Path, FileStatus>
pathToFileStatus, String qualifiedPath, String uriPath)
- throws URISyntaxException, IOException {
- Mockito.when(fs.getUri()).thenReturn(new URI(null, null, uriPath, null));
- for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
- Path path = entry.getKey();
- FileStatus fileStatus = entry.getValue();
- Mockito.when(fs.getFileStatus(path)).thenReturn(fileStatus);
- Mockito.when(fs.makeQualified(path)).thenReturn(new Path(qualifiedPath));
+ /**
+ * Sadly, this is needed to avoid losing `FileSystem` mock to replacement
from the `FileSystem.get` `static`
+ * Without this, so to lose the mock, we'd be unable to set up any source
paths as existing.
+ */
+ protected static class TrickIcebergDataset extends IcebergDataset {
+ public TrickIcebergDataset(String db, String table, IcebergTable
icebergTbl, Properties properties, FileSystem sourceFs) {
+ super(db, table, icebergTbl, properties, sourceFs);
+ }
+
+ @Override // as the `static` is not mock-able
+ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus
fileStatus, Configuration hadoopConfig) throws IOException {
+ return this.sourceFs;
+ }
+ };
+
+
+ /** Builds a {@link FileSystem} mock */
+ protected static class MockFileSystemBuilder {
+ private final URI fsURI;
+ /** when not `.isPresent()`, all paths exist; when `.get().isEmpty()`,
none exist; else only those indicated do */
+ private final Optional<Set<Path>> optPaths;
+
+ public MockFileSystemBuilder(URI fsURI) {
+ this(fsURI, false);
+ }
+ public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) {
+ this.fsURI = fsURI;
+ this.optPaths = shouldRepresentEveryPath ? Optional.empty() :
Optional.of(Sets.newHashSet());
+ }
+
+ public Optional<Set<Path>> getPaths() {
+ return this.optPaths.map(Sets::newHashSet); // copy before returning
+ }
+
+ public void addPaths(List<String> pathStrings) {
+ for (String pathString : pathStrings) {
+ addPath(pathString);
+ }
+ }
+
+ public void addPath(String pathString) {
+ addPath(new Path(pathString));
+ }
+
+ public void addPath(Path path) {
+ if (!this.optPaths.isPresent()) {
+ throw new IllegalStateException("unable to add paths when constructed
with `shouldRepresentEveryPath == true`");
+ }
+ if (this.optPaths.get().add(path) && !path.isRoot()) { // recursively
add ancestors of a previously unknown path
+ addPath(path.getParent());
+ }
+ }
+
+ public FileSystem build() throws IOException {
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ Mockito.when(fs.getUri()).thenReturn(fsURI);
+ Mockito.when(fs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgumentAt(0,
Path.class).makeQualified(fsURI, new Path("/")));
+
+ if (!this.optPaths.isPresent()) {
+ Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation
->
+ createEmptyFileStatus(invocation.getArgumentAt(0,
Path.class).toString()));
+ } else {
+ Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new
FileNotFoundException());
+ for (Path p : this.optPaths.get()) {
+
Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
+ }
+ }
+ return fs;
+ }
+
+ protected static FileStatus createEmptyFileStatus(String pathString)
throws IOException {
+ Path path = new Path(pathString);
+ FileStatus fileStatus = new FileStatus();
+ fileStatus.setPath(path);
+ return fileStatus;
}
}
+
private static class MockedIcebergTable extends IcebergTable {
@Data
@@ -194,6 +280,14 @@ public class IcebergDatasetTest {
private final Optional<String> metadataPath;
private final String manifestListPath;
private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+
+ public IcebergSnapshotInfo asSnapshotInfo() {
+ return asSnapshotInfo(0L, Instant.ofEpochMilli(0L));
+ }
+
+ public IcebergSnapshotInfo asSnapshotInfo(Long snapshotId, Instant
timestamp) {
+ return new IcebergSnapshotInfo(snapshotId, timestamp,
this.metadataPath, this.manifestListPath, this.manifestFiles);
+ }
}
private final List<SnapshotPaths> snapshotPathsList;
@@ -205,43 +299,11 @@ public class IcebergDatasetTest {
@Override
public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() {
- Long snapshotId = 0L;
- Instant timestamp = Instant.ofEpochMilli(0L);
List<IcebergSnapshotInfo> snapshotInfos = snapshotPathsList.stream()
- .map(snapshotPaths -> createSnapshotInfo(snapshotPaths, snapshotId,
timestamp))
+ .map(SnapshotPaths::asSnapshotInfo)
.collect(Collectors.toList());
return snapshotInfos.iterator();
}
-
- private IcebergSnapshotInfo createSnapshotInfo(SnapshotPaths
snapshotPaths, Long snapshotId, Instant timestamp) {
- return new IcebergSnapshotInfo(snapshotId, timestamp,
snapshotPaths.metadataPath, snapshotPaths.manifestListPath,
snapshotPaths.manifestFiles);
- }
- }
-
- private static class DestinationFileSystem {
- Map<Path, FileStatus> pathToFileStatus;
-
- public DestinationFileSystem() {
- this.pathToFileStatus = Maps.newHashMap();
- }
-
- public void addPaths(List<String> pathStrings) {
- for (String pathString : pathStrings) {
- addPath(pathString);
- }
- }
-
- public void addPath(String pathString) {
- Path path = new Path(pathString);
- FileStatus fileStatus = new FileStatus();
- fileStatus.setPath(path);
- this.pathToFileStatus.put(path, fileStatus);
- }
-
- public void addPath(String pathString, FileStatus fileStatus) {
- Path path = new Path(pathString);
- this.pathToFileStatus.put(path, fileStatus);
- }
}
}