This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 81910bf15 [GOBBLIN-1976] Allow an `IcebergCatalog` to override the 
`DatasetDescriptor` platform name for the `IcebergTable`s it creates (#3848)
81910bf15 is described below

commit 81910bf15c111945bf7c0bd65711b61416994189
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Dec 14 14:53:49 2023 -0800

    [GOBBLIN-1976] Allow an `IcebergCatalog` to override the 
`DatasetDescriptor` platform name for the `IcebergTable`s it creates (#3848)
    
    * Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform 
for the `IcebergTable`s it creates
    
    * fixup javadoc
---
 .../data/management/copy/iceberg/BaseIcebergCatalog.java      | 11 ++++++++++-
 .../gobblin/data/management/copy/iceberg/IcebergTable.java    |  8 +++++---
 .../data/management/copy/iceberg/IcebergDatasetTest.java      |  5 +++--
 3 files changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 85c995844..9e2ae53b9 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.gobblin.dataset.DatasetConstants;
 
 /**
  * Base implementation of {@link IcebergCatalog} to access {@link 
IcebergTable} and the
@@ -42,7 +43,7 @@ public abstract class BaseIcebergCatalog implements 
IcebergCatalog {
   @Override
   public IcebergTable openTable(String dbName, String tableName) {
     TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
-    return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), 
createTableOperations(tableId), this.getCatalogUri());
+    return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), 
getDatasetDescriptorPlatform(), createTableOperations(tableId), 
this.getCatalogUri());
   }
 
   protected Catalog createCompanionCatalog(Map<String, String> properties, 
Configuration configuration) {
@@ -57,5 +58,13 @@ public abstract class BaseIcebergCatalog implements 
IcebergCatalog {
     return tableId.toString(); // default to FQ ID with both table namespace 
and name
   }
 
+  /**
+   * Enable catalog-specific naming for charting lineage, etc.  This default 
impl gives {@link DatasetConstants#PLATFORM_ICEBERG}
+   * @return the {@link 
org.apache.gobblin.dataset.DatasetDescriptor#getPlatform()} to use for tables 
from this catalog
+   */
+  protected String getDatasetDescriptorPlatform() {
+    return DatasetConstants.PLATFORM_ICEBERG;
+  }
+
   protected abstract TableOperations createTableOperations(TableIdentifier 
tableId);
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 24febe897..245298bcb 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -71,14 +71,16 @@ public class IcebergTable {
 
   @Getter
   private final TableIdentifier tableId;
-  /** allow the {@link IcebergCatalog} creating this table to qualify its name 
when used for lineage, etc. */
+  /** allow the {@link IcebergCatalog} creating this table to qualify its 
{@link DatasetDescriptor#getName()} used for lineage, etc. */
   private final String datasetDescriptorName;
+  /** allow the {@link IcebergCatalog} creating this table to specify the 
{@link DatasetDescriptor#getPlatform()} used for lineage, etc. */
+  private final String datasetDescriptorPlatform;
   private final TableOperations tableOps;
   private final String catalogUri;
 
   @VisibleForTesting
   IcebergTable(TableIdentifier tableId, TableOperations tableOps, String 
catalogUri) {
-    this(tableId, tableId.toString(), tableOps, catalogUri);
+    this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, 
tableOps, catalogUri);
   }
 
   /** @return metadata info limited to the most recent (current) snapshot */
@@ -194,7 +196,7 @@ public class IcebergTable {
 
   public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
     DatasetDescriptor descriptor = new DatasetDescriptor(
-        DatasetConstants.PLATFORM_ICEBERG,
+        datasetDescriptorPlatform,
         URI.create(this.catalogUri),
         this.datasetDescriptorName
     );
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 4b457923e..a005eb315 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -117,10 +117,11 @@ public class IcebergDatasetTest {
   public void testGetDatasetDescriptor() throws URISyntaxException {
     TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
     String qualifiedTableName = "foo_prefix." + tableId.toString();
-    IcebergTable table = new IcebergTable(tableId, qualifiedTableName, 
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+    String platformName = "Floe";
+    IcebergTable table = new IcebergTable(tableId, qualifiedTableName, 
platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
     FileSystem mockFs = Mockito.mock(FileSystem.class);
     Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
-    DatasetDescriptor expected = new 
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, 
URI.create(SRC_CATALOG_URI), qualifiedTableName);
+    DatasetDescriptor expected = new DatasetDescriptor(platformName, 
URI.create(SRC_CATALOG_URI), qualifiedTableName);
     expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
     Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
   }

Reply via email to