This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 81910bf15 [GOBBLIN-1976] Allow an `IcebergCatalog` to override the
`DatasetDescriptor` platform name for the `IcebergTable`s it creates (#3848)
81910bf15 is described below
commit 81910bf15c111945bf7c0bd65711b61416994189
Author: Kip Kohn <[email protected]>
AuthorDate: Thu Dec 14 14:53:49 2023 -0800
[GOBBLIN-1976] Allow an `IcebergCatalog` to override the
`DatasetDescriptor` platform name for the `IcebergTable`s it creates (#3848)
* Allow an `IcebergCatalog` to override the `DatasetDescriptor` platform
for the `IcebergTable`s it creates
* fixup javadoc
---
.../data/management/copy/iceberg/BaseIcebergCatalog.java | 11 ++++++++++-
.../gobblin/data/management/copy/iceberg/IcebergTable.java | 8 +++++---
.../data/management/copy/iceberg/IcebergDatasetTest.java | 5 +++--
3 files changed, 18 insertions(+), 6 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 85c995844..9e2ae53b9 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -24,6 +24,7 @@ import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.gobblin.dataset.DatasetConstants;
/**
* Base implementation of {@link IcebergCatalog} to access {@link
IcebergTable} and the
@@ -42,7 +43,7 @@ public abstract class BaseIcebergCatalog implements
IcebergCatalog {
@Override
public IcebergTable openTable(String dbName, String tableName) {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
- return new IcebergTable(tableId, calcDatasetDescriptorName(tableId),
createTableOperations(tableId), this.getCatalogUri());
+ return new IcebergTable(tableId, calcDatasetDescriptorName(tableId),
getDatasetDescriptorPlatform(), createTableOperations(tableId),
this.getCatalogUri());
}
protected Catalog createCompanionCatalog(Map<String, String> properties,
Configuration configuration) {
@@ -57,5 +58,13 @@ public abstract class BaseIcebergCatalog implements
IcebergCatalog {
return tableId.toString(); // default to FQ ID with both table namespace
and name
}
+ /**
+ * Enable catalog-specific naming for charting lineage, etc. This default
impl gives {@link DatasetConstants#PLATFORM_ICEBERG}
+ * @return the {@link
org.apache.gobblin.dataset.DatasetDescriptor#getPlatform()} to use for tables
from this catalog
+ */
+ protected String getDatasetDescriptorPlatform() {
+ return DatasetConstants.PLATFORM_ICEBERG;
+ }
+
protected abstract TableOperations createTableOperations(TableIdentifier
tableId);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 24febe897..245298bcb 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -71,14 +71,16 @@ public class IcebergTable {
@Getter
private final TableIdentifier tableId;
- /** allow the {@link IcebergCatalog} creating this table to qualify its name
when used for lineage, etc. */
+ /** allow the {@link IcebergCatalog} creating this table to qualify its
{@link DatasetDescriptor#getName()} used for lineage, etc. */
private final String datasetDescriptorName;
+ /** allow the {@link IcebergCatalog} creating this table to specify the
{@link DatasetDescriptor#getPlatform()} used for lineage, etc. */
+ private final String datasetDescriptorPlatform;
private final TableOperations tableOps;
private final String catalogUri;
@VisibleForTesting
IcebergTable(TableIdentifier tableId, TableOperations tableOps, String
catalogUri) {
- this(tableId, tableId.toString(), tableOps, catalogUri);
+ this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG,
tableOps, catalogUri);
}
/** @return metadata info limited to the most recent (current) snapshot */
@@ -194,7 +196,7 @@ public class IcebergTable {
public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
DatasetDescriptor descriptor = new DatasetDescriptor(
- DatasetConstants.PLATFORM_ICEBERG,
+ datasetDescriptorPlatform,
URI.create(this.catalogUri),
this.datasetDescriptorName
);
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 4b457923e..a005eb315 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -117,10 +117,11 @@ public class IcebergDatasetTest {
public void testGetDatasetDescriptor() throws URISyntaxException {
TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
String qualifiedTableName = "foo_prefix." + tableId.toString();
- IcebergTable table = new IcebergTable(tableId, qualifiedTableName,
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+ String platformName = "Floe";
+ IcebergTable table = new IcebergTable(tableId, qualifiedTableName,
platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
FileSystem mockFs = Mockito.mock(FileSystem.class);
Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
- DatasetDescriptor expected = new
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
URI.create(SRC_CATALOG_URI), qualifiedTableName);
+ DatasetDescriptor expected = new DatasetDescriptor(platformName,
URI.create(SRC_CATALOG_URI), qualifiedTableName);
expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
}