This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bc456b961 [GOBBLIN-1971] Allow `IcebergCatalog` to specify the 
`DatasetDescriptor` name for the `IcebergTable`s it creates (#3842)
bc456b961 is described below

commit bc456b96166db375f2ec4a2066f67f96a023d861
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Dec 4 14:01:20 2023 -0800

    [GOBBLIN-1971] Allow `IcebergCatalog` to specify the `DatasetDescriptor` 
name for the `IcebergTable`s it creates (#3842)
    
    * Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the 
`IcebergTable`s it creates
    
    * small method javadoc
---
 .../data/management/copy/iceberg/BaseIcebergCatalog.java       | 10 +++++++++-
 .../gobblin/data/management/copy/iceberg/IcebergCatalog.java   |  2 ++
 .../gobblin/data/management/copy/iceberg/IcebergTable.java     | 10 +++++++++-
 .../data/management/copy/iceberg/IcebergDatasetTest.java       |  5 +++--
 4 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 0ac4dcc0b..85c995844 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -42,12 +42,20 @@ public abstract class BaseIcebergCatalog implements 
IcebergCatalog {
   @Override
   public IcebergTable openTable(String dbName, String tableName) {
     TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
-    return new IcebergTable(tableId, createTableOperations(tableId), 
this.getCatalogUri());
+    return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), 
createTableOperations(tableId), this.getCatalogUri());
   }
 
   protected Catalog createCompanionCatalog(Map<String, String> properties, 
Configuration configuration) {
     return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), 
this.catalogName, properties, configuration);
   }
 
+  /**
+   * Enable catalog-specific qualification for charting lineage, etc.  This 
default impl is an identity pass-through that adds no qualification.
+   * @return the name to use for the table identified by {@link 
TableIdentifier}
+   */
+  protected String calcDatasetDescriptorName(TableIdentifier tableId) {
+    return tableId.toString(); // default to FQ ID with both table namespace 
and name
+  }
+
   protected abstract TableOperations createTableOperations(TableIdentifier 
tableId);
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 5794a4c03..68e9bb31c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -28,8 +28,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
  */
 public interface IcebergCatalog {
 
+  /** @return table identified by `dbName` and `tableName` */
   IcebergTable openTable(String dbName, String tableName);
 
+  /** @return table identified by `tableId` */
   default IcebergTable openTable(TableIdentifier tableId) {
     // CHALLENGE: clearly better to implement in the reverse direction - 
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
     // but challenging to do at this point, with multiple derived classes 
already "in the wild" that implement `openTable(String, String)`
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index a07f6ac89..24febe897 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -70,9 +71,16 @@ public class IcebergTable {
 
   @Getter
   private final TableIdentifier tableId;
+  /** allow the {@link IcebergCatalog} creating this table to qualify its name 
when used for lineage, etc. */
+  private final String datasetDescriptorName;
   private final TableOperations tableOps;
   private final String catalogUri;
 
+  @VisibleForTesting
+  IcebergTable(TableIdentifier tableId, TableOperations tableOps, String 
catalogUri) {
+    this(tableId, tableId.toString(), tableOps, catalogUri);
+  }
+
   /** @return metadata info limited to the most recent (current) snapshot */
   public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
     TableMetadata current = accessTableMetadata();
@@ -188,7 +196,7 @@ public class IcebergTable {
     DatasetDescriptor descriptor = new DatasetDescriptor(
         DatasetConstants.PLATFORM_ICEBERG,
         URI.create(this.catalogUri),
-        this.tableId.toString() // use FQ ID, including table namespace
+        this.datasetDescriptorName
     );
     descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
     return descriptor;
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 0b485e1df..4b457923e 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -116,10 +116,11 @@ public class IcebergDatasetTest {
   @Test
   public void testGetDatasetDescriptor() throws URISyntaxException {
     TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
-    IcebergTable table = new IcebergTable(tableId, 
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+    String qualifiedTableName = "foo_prefix." + tableId.toString();
+    IcebergTable table = new IcebergTable(tableId, qualifiedTableName, 
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
     FileSystem mockFs = Mockito.mock(FileSystem.class);
     Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
-    DatasetDescriptor expected = new 
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, 
URI.create(SRC_CATALOG_URI), tableId.toString());
+    DatasetDescriptor expected = new 
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, 
URI.create(SRC_CATALOG_URI), qualifiedTableName);
     expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
     Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
   }

Reply via email to