This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bc456b961 [GOBBLIN-1971] Allow `IcebergCatalog` to specify the
`DatasetDescriptor` name for the `IcebergTable`s it creates (#3842)
bc456b961 is described below
commit bc456b96166db375f2ec4a2066f67f96a023d861
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Dec 4 14:01:20 2023 -0800
[GOBBLIN-1971] Allow `IcebergCatalog` to specify the `DatasetDescriptor`
name for the `IcebergTable`s it creates (#3842)
* Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the
`IcebergTable`s it creates
* small method javadoc
---
.../data/management/copy/iceberg/BaseIcebergCatalog.java | 10 +++++++++-
.../gobblin/data/management/copy/iceberg/IcebergCatalog.java | 2 ++
.../gobblin/data/management/copy/iceberg/IcebergTable.java | 10 +++++++++-
.../data/management/copy/iceberg/IcebergDatasetTest.java | 5 +++--
4 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 0ac4dcc0b..85c995844 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -42,12 +42,20 @@ public abstract class BaseIcebergCatalog implements
IcebergCatalog {
@Override
public IcebergTable openTable(String dbName, String tableName) {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
- return new IcebergTable(tableId, createTableOperations(tableId),
this.getCatalogUri());
+ return new IcebergTable(tableId, calcDatasetDescriptorName(tableId),
createTableOperations(tableId), this.getCatalogUri());
}
protected Catalog createCompanionCatalog(Map<String, String> properties,
Configuration configuration) {
return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(),
this.catalogName, properties, configuration);
}
+ /**
+ * Enable catalog-specific qualification for charting lineage, etc. This
default impl is an identity pass-through that adds no qualification.
+ * @return the name to use for the table identified by {@link
TableIdentifier}
+ */
+ protected String calcDatasetDescriptorName(TableIdentifier tableId) {
+ return tableId.toString(); // default to FQ ID with both table namespace
and name
+ }
+
protected abstract TableOperations createTableOperations(TableIdentifier
tableId);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 5794a4c03..68e9bb31c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -28,8 +28,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
*/
public interface IcebergCatalog {
+ /** @return table identified by `dbName` and `tableName` */
IcebergTable openTable(String dbName, String tableName);
+ /** @return table identified by `tableId` */
default IcebergTable openTable(TableIdentifier tableId) {
// CHALLENGE: clearly better to implement in the reverse direction -
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
// but challenging to do at this point, with multiple derived classes
already "in the wild" that implement `openTable(String, String)`
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index a07f6ac89..24febe897 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -70,9 +71,16 @@ public class IcebergTable {
@Getter
private final TableIdentifier tableId;
+ /** allow the {@link IcebergCatalog} creating this table to qualify its name
when used for lineage, etc. */
+ private final String datasetDescriptorName;
private final TableOperations tableOps;
private final String catalogUri;
+ @VisibleForTesting
+ IcebergTable(TableIdentifier tableId, TableOperations tableOps, String
catalogUri) {
+ this(tableId, tableId.toString(), tableOps, catalogUri);
+ }
+
/** @return metadata info limited to the most recent (current) snapshot */
public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
TableMetadata current = accessTableMetadata();
@@ -188,7 +196,7 @@ public class IcebergTable {
DatasetDescriptor descriptor = new DatasetDescriptor(
DatasetConstants.PLATFORM_ICEBERG,
URI.create(this.catalogUri),
- this.tableId.toString() // use FQ ID, including table namespace
+ this.datasetDescriptorName
);
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 0b485e1df..4b457923e 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -116,10 +116,11 @@ public class IcebergDatasetTest {
@Test
public void testGetDatasetDescriptor() throws URISyntaxException {
TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
- IcebergTable table = new IcebergTable(tableId,
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+ String qualifiedTableName = "foo_prefix." + tableId.toString();
+ IcebergTable table = new IcebergTable(tableId, qualifiedTableName,
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
FileSystem mockFs = Mockito.mock(FileSystem.class);
Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
- DatasetDescriptor expected = new
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
URI.create(SRC_CATALOG_URI), tableId.toString());
+ DatasetDescriptor expected = new
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
URI.create(SRC_CATALOG_URI), qualifiedTableName);
expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
}