[
https://issues.apache.org/jira/browse/GOBBLIN-1709?focusedWorklogId=809275&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-809275
]
ASF GitHub Bot logged work on GOBBLIN-1709:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 15/Sep/22 19:45
Start Date: 15/Sep/22 19:45
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3560:
URL: https://github.com/apache/gobblin/pull/3560#discussion_r972350695
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -53,37 +54,39 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
/*
- * Both Iceberg database name and table name are mandatory,
- * since we are currently only supporting Hive Catalog based Iceberg
tables.
- * The design will support defaults and other catalogs in future releases.
+ * Both Iceberg database name and table name are mandatory based on
current implementation.
Review Comment:
please put into method javadoc, to show up when reading in that tool
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -77,80 +74,112 @@ public void testGetFilePaths() throws IOException {
}
/**
- * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ * Test case to generate copy entities for all the file paths for a mocked
iceberg table.
+ * The assumption here is that we create copy entities for all the matching
file paths,
+ * without calculating any difference between the source and destination
*/
@Test
public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
Review Comment:
maybe rename so test name tracks to comment above... such as
`testGenerateCopyEntitiesWhenDestEmpty`?
(it's not so much we wouldn't calculate a difference between src and
destination, but that this initial, simplest test mocks a destination that has
no data yet)
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -77,80 +74,112 @@ public void testGetFilePaths() throws IOException {
}
/**
- * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ * Test case to generate copy entities for all the file paths for a mocked
iceberg table.
+ * The assumption here is that we create copy entities for all the matching
file paths,
+ * without calculating any difference between the source and destination
*/
@Test
public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
FileSystem fs = Mockito.mock(FileSystem.class);
String test_db_name = "test_db_name";
String test_table_name = "test_tbl_name";
- Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
-
+ String test_qualified_path =
"/root/iceberg/test/destination/sub_path_destination";
+ String test_uri_path = "/root/iceberg/test/output";
Properties properties = new Properties();
properties.setProperty("data.publisher.final.dir", "/test");
+ List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext())
.build();
- TableOperations tableOperations = Mockito.mock(TableOperations.class);
- IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH,
MANIFEST_PATH, MANIFEST_LIST_PATH, new
ArrayList<>(Arrays.asList(MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2)));
IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_LIST_PATH);
+ Path path4 = new Path(MANIFEST_FILE_PATH1);
+ Path path5 = new Path(MANIFEST_FILE_PATH2);
+
FileStatus fileStatus1 = new FileStatus();
- fileStatus1.setPath(new Path(METADATA_PATH));
+ fileStatus1.setPath(path1);
FileStatus fileStatus2 = new FileStatus();
- fileStatus2.setPath(new Path(MANIFEST_PATH));
+ fileStatus2.setPath(path2);
FileStatus fileStatus3 = new FileStatus();
- fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ fileStatus3.setPath(path3);
FileStatus fileStatus4 = new FileStatus();
- fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
-
- Path path1 = new Path(METADATA_PATH);
- Path path2 = new Path(MANIFEST_PATH);
- Path path3 = new Path(MANIFEST_FILE_PATH1);
- Path path4 = new Path(MANIFEST_FILE_PATH2);
+ fileStatus4.setPath(path4);
+ FileStatus fileStatus5 = new FileStatus();
+ fileStatus5.setPath(path5);
+ Map<Path, FileStatus> mapOfPathAndFileStatus = Maps.newHashMap();
+ mapOfPathAndFileStatus.put(path1, fileStatus1);
+ mapOfPathAndFileStatus.put(path2, fileStatus2);
+ mapOfPathAndFileStatus.put(path3, fileStatus3);
+ mapOfPathAndFileStatus.put(path4, fileStatus4);
+ mapOfPathAndFileStatus.put(path5, fileStatus5);
Review Comment:
these 20+ lines for 5 file paths seems a too much repetition and boiler
plate. could we invent an abstraction, like `MockDestinationFileSystem` and
give it methods:
```
MockDestinationFileSystem addPath(String p); // empty `FileStatus`
(indicating not found)
MockDestinationFileSystem addPath(String p, FileStatus fstat);
```
?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java:
##########
@@ -21,20 +21,26 @@
import java.util.Collection;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.partition.FileSet;
+
/**
- * A {@link IcebergFileSet} that generates {@link CopyEntity}s for a Hive
Catalog based Iceberg table
+ * A {@link IcebergTableFileSet} that generates {@link CopyEntity}s for a Hive
Catalog based Iceberg table
+ * A {@link FileSet} for Iceberg datasets containing information associated
with an Iceberg table.
*/
-public class IcebergTableFileSet extends IcebergFileSet{
+public class IcebergTableFileSet extends FileSet<CopyEntity> {
private final CopyConfiguration copyConfiguration;
+ private final IcebergDataset icebergDataset;
+
public IcebergTableFileSet(String name, IcebergDataset icebergDataset,
CopyConfiguration configuration) {
super(name, icebergDataset);
this.copyConfiguration = configuration;
+ this.icebergDataset = icebergDataset;
}
@Override
protected Collection<CopyEntity> generateCopyEntities() throws IOException {
- return
this.getIcebergDataset().generateCopyEntitiesForTableFileSet(this.copyConfiguration);
+ return
this.icebergDataset.generateCopyEntitiesForTableFileSet(this.copyConfiguration);
Review Comment:
more I see this, I'd prefer to shorten the name to
`IcebergDataset.generateCopyEntities`. whaddya say?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -53,37 +54,39 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
/*
- * Both Iceberg database name and table name are mandatory,
- * since we are currently only supporting Hive Catalog based Iceberg
tables.
- * The design will support defaults and other catalogs in future releases.
+ * Both Iceberg database name and table name are mandatory based on
current implementation.
+ * Later we may explore supporting datasets similar to Hive
*/
- if (properties.getProperty(ICEBERG_DB_NAME) == null ||
properties.getProperty(ICEBERG_TABLE_NAME) == null) {
- throw new IOException("Iceberg database name or Iceberg table name is
missing");
+ if (StringUtils.isNotBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isNotBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
+ throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
+ ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
this.dbName = properties.getProperty(ICEBERG_DB_NAME);
this.tblName = properties.getProperty(ICEBERG_TABLE_NAME);
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
IcebergCatalog icebergCatalog =
IcebergCatalogFactory.create(configuration);
- IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
- // Currently, we only support one dataset per iceberg table
- matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergTable,
properties, fs));
+ /* Currently, we only support one dataset per iceberg table
+ * Error handling and verification of table existence will be included as
part IcebergTable.getCurrentSnapshotInfo() in future releases.
Review Comment:
not a future release. merely it will happen later on (during this same
execution).
thinking more on it, this may be the best time to indicate to the user that
the db and table names they gave don't indicate a known iceberg table--kind of
a pre-verification. either quickly add or else leave a todo... but seems you
could use:
```
Catalog.listNamespaces(dbName)
Catalog.listTables(namespace)
```
to probe for that.
the last return value isn't ideal, and I see no better alternative than to
sequentially scan the `List<String>` returned to look for `tblName`
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -77,80 +74,112 @@ public void testGetFilePaths() throws IOException {
}
/**
- * Test case to copy all the file paths for a mocked iceberg table. This is
a full copy overwriting everything on the destination
+ * Test case to generate copy entities for all the file paths for a mocked
iceberg table.
+ * The assumption here is that we create copy entities for all the matching
file paths,
+ * without calculating any difference between the source and destination
*/
@Test
public void testGenerateCopyEntitiesForTableFileSet() throws IOException,
URISyntaxException {
FileSystem fs = Mockito.mock(FileSystem.class);
String test_db_name = "test_db_name";
String test_table_name = "test_tbl_name";
- Set<String> setOfFilePaths = new HashSet<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
-
+ String test_qualified_path =
"/root/iceberg/test/destination/sub_path_destination";
+ String test_uri_path = "/root/iceberg/test/output";
Properties properties = new Properties();
properties.setProperty("data.publisher.final.dir", "/test");
+ List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH,
MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
CopyConfiguration copyConfiguration = CopyConfiguration.builder(null,
properties)
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext())
.build();
- TableOperations tableOperations = Mockito.mock(TableOperations.class);
- IcebergTable icebergTable = new MockedIcebergTable(tableOperations);
+ IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH,
MANIFEST_PATH, MANIFEST_LIST_PATH, new
ArrayList<>(Arrays.asList(MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2)));
IcebergDataset icebergDataset = new IcebergDataset(test_db_name,
test_table_name, icebergTable, new Properties(), fs);
+ Path path1 = new Path(METADATA_PATH);
+ Path path2 = new Path(MANIFEST_PATH);
+ Path path3 = new Path(MANIFEST_LIST_PATH);
+ Path path4 = new Path(MANIFEST_FILE_PATH1);
+ Path path5 = new Path(MANIFEST_FILE_PATH2);
+
FileStatus fileStatus1 = new FileStatus();
- fileStatus1.setPath(new Path(METADATA_PATH));
+ fileStatus1.setPath(path1);
FileStatus fileStatus2 = new FileStatus();
- fileStatus2.setPath(new Path(MANIFEST_PATH));
+ fileStatus2.setPath(path2);
FileStatus fileStatus3 = new FileStatus();
- fileStatus3.setPath(new Path(MANIFEST_FILE_PATH1));
+ fileStatus3.setPath(path3);
FileStatus fileStatus4 = new FileStatus();
- fileStatus4.setPath(new Path(MANIFEST_FILE_PATH2));
-
- Path path1 = new Path(METADATA_PATH);
- Path path2 = new Path(MANIFEST_PATH);
- Path path3 = new Path(MANIFEST_FILE_PATH1);
- Path path4 = new Path(MANIFEST_FILE_PATH2);
+ fileStatus4.setPath(path4);
+ FileStatus fileStatus5 = new FileStatus();
+ fileStatus5.setPath(path5);
+ Map<Path, FileStatus> mapOfPathAndFileStatus = Maps.newHashMap();
+ mapOfPathAndFileStatus.put(path1, fileStatus1);
+ mapOfPathAndFileStatus.put(path2, fileStatus2);
+ mapOfPathAndFileStatus.put(path3, fileStatus3);
+ mapOfPathAndFileStatus.put(path4, fileStatus4);
+ mapOfPathAndFileStatus.put(path5, fileStatus5);
- Mockito.when(fs.makeQualified(any(Path.class))).thenReturn(new
Path("/root/iceberg/test/destination/sub_path_destination"));
- Mockito.when(fs.getFileStatus(path1)).thenReturn(fileStatus1);
- Mockito.when(fs.getFileStatus(path2)).thenReturn(fileStatus2);
- Mockito.when(fs.getFileStatus(path3)).thenReturn(fileStatus3);
- Mockito.when(fs.getFileStatus(path4)).thenReturn(fileStatus4);
- Mockito.when(fs.getUri()).thenReturn(new URI(null, null,
"/root/iceberg/test/output", null));
+ mockFileSystemMethodCalls(fs, mapOfPathAndFileStatus, test_qualified_path,
test_uri_path);
Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntitiesForTableFileSet(copyConfiguration);
- Assert.assertEquals(copyEntities.size(), setOfFilePaths.size());
+ verifyCopyEntities(copyEntities, expected);
+
+ }
+
+ private void verifyCopyEntities(Collection<CopyEntity> copyEntities,
List<String> expected) {
+ List<String> actual = new ArrayList<>();
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
JsonObject objectData =
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
JsonObject pathObject =
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
String filepath =
pathObject.getAsJsonPrimitive("object-data").getAsString();
- Assert.assertTrue(setOfFilePaths.contains(filepath));
- setOfFilePaths.remove(filepath);
+ actual.add(filepath);
+ }
+ Assert.assertEquals(actual.size(), expected.size());
+ Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
+ }
+
+ private void mockFileSystemMethodCalls(FileSystem fs, Map<Path, FileStatus>
pathToFileStatus, String qualifiedPath, String uriPath)
+ throws URISyntaxException, IOException {
+
+ Mockito.when(fs.getUri()).thenReturn(new URI(null, null, uriPath, null));
+ for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+ Path path = entry.getKey();
+ FileStatus fileStatus = entry.getValue();
+ Mockito.when(fs.getFileStatus(path)).thenReturn(fileStatus);
+ Mockito.when(fs.makeQualified(path)).thenReturn(new Path(qualifiedPath));
}
- Assert.assertTrue(setOfFilePaths.isEmpty());
}
private static class MockedIcebergTable extends IcebergTable {
- public MockedIcebergTable(TableOperations tableOps) {
- super(tableOps);
+ String metadataPath;
+ String manifestListPath;
+ String manifestFilePath;
+ List<String> manifestListFilePaths;
Review Comment:
this mocking only works for a single manifest file (e.g. when the manifest
list has only one entry). let's generalize to the full model, which is a list
of manifest files, each with their own list of listed (data) file paths.
Issue Time Tracking
-------------------
Worklog Id: (was: 809275)
Time Spent: 4h 50m (was: 4h 40m)
> Create work units for Hive Catalog based Iceberg Datasets to support Distcp
> for Iceberg
> ---------------------------------------------------------------------------------------
>
> Key: GOBBLIN-1709
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1709
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: distcp-ng
> Reporter: Meeth Gala
> Assignee: Issac Buenrostro
> Priority: Major
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> We want to support Distcp for Iceberg based datasets.
> As a pilot, we are starting with Hive Catalog and will expand the
> functionality to cover all Iceberg based datasets.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)