[
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=852121&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-852121
]
ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Mar/23 23:52
Start Date: 21/Mar/23 23:52
Worklog Time Spent: 10m
Work Description: meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1144097069
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
- private final IcebergTable icebergTable;
+ private final IcebergTable srcIcebergTable;
+ private final IcebergTable existingTargetIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make
parameterizable, if desired
- /** Target metastore URI */
- public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
- IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
- public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem sourceFs) {
+ public IcebergDataset(String db, String table, IcebergTable srcIcebergTable,
IcebergTable existingTargetIcebergTable, Properties properties, FileSystem
sourceFs) {
this.dbName = db;
this.inputTableName = table;
- this.icebergTable = icebergTbl;
+ this.srcIcebergTable = srcIcebergTable;
+ this.existingTargetIcebergTable = existingTargetIcebergTable;
Review Comment:
I have added the Javadoc conveying that we presume/require that the
destination table exists. I have taken your suggestion to check if it even
exists before we initiate any replication. In my latest commit, I am checking
for the table before creating a new `TableOps` for it
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return this.icebergTable.getDatasetDescriptor(sourceFs);
+ return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return this.icebergTable.getDatasetDescriptor(targetFs);
+ return this.srcIcebergTable.getDatasetDescriptor(targetFs);
Review Comment:
yes, we do. It was just a part of refactor -> rename wherein it picked up
src instead of dst
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
- private final IcebergTable icebergTable;
+ private final IcebergTable srcIcebergTable;
+ private final IcebergTable existingTargetIcebergTable;
Review Comment:
I agree. I have kept it consisted in my latest commit replacing 'target'
with 'destination'
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return this.icebergTable.getDatasetDescriptor(sourceFs);
+ return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return this.icebergTable.getDatasetDescriptor(targetFs);
+ return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+ }
+
+ private void addPostPublishStep(List<CopyEntity> copyEntities) {
+ IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(this.getSrcIcebergTable(),
this.getExistingTargetIcebergTable());
Review Comment:
I have added them as method args now... so it will refer whatever gets
passed to it.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return this.icebergTable.getDatasetDescriptor(sourceFs);
+ return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return this.icebergTable.getDatasetDescriptor(targetFs);
+ return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+ }
+
+ private void addPostPublishStep(List<CopyEntity> copyEntities) {
+ IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(this.getSrcIcebergTable(),
this.getExistingTargetIcebergTable());
+ copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(),
icebergRegisterStep, 0));
Review Comment:
I agree! I have changed the method definition as your suggestion above :)
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator()
throws IOException {
return findDatasets().iterator();
}
- protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
- IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
- return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+ protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog,
Properties properties, FileSystem fs) {
+ IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName,
tblName);
+ IcebergTable existingTargetIcebergTable =
targetIcebergCatalog.openTable(dbName, tblName);
+ return new IcebergDataset(dbName, tblName, srcIcebergTable,
existingTargetIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties) throws
IOException, ClassNotFoundException {
+ protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
- String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is
required");
- catalogProperties.put(CatalogProperties.URI, catalogUri);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ String catalogUri;
+ String icebergCatalogClassName;
+ switch (location) {
+ case SOURCE:
+ catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service
URI is required");
+ // introducing an optional property for catalogs requiring cluster
specific properties
+
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ break;
+ case TARGET:
+ catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service
URI is required");
+ // introducing an optional property for catalogs requiring cluster
specific properties
+
Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ break;
+ default:
+ throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ }
+ icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ catalogProperties.put(CatalogProperties.URI, catalogUri);
Review Comment:
I believe since both src and dst will have the same catalog class key... I
have removed source and kept it generic. Now, both src and dst will have the
same catalog class key.
Issue Time Tracking
-------------------
Worklog Id: (was: 852121)
Time Spent: 0.5h (was: 20m)
> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
> Key: GOBBLIN-1802
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
> Project: Apache Gobblin
> Issue Type: New Feature
> Reporter: Meeth Gala
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)