[ 
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=852121&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-852121
 ]

ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/23 23:52
            Start Date: 21/Mar/23 23:52
    Worklog Time Spent: 10m 
      Work Description: meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1144097069


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make 
parameterizable, if desired
 
-  /** Target metastore URI */
-  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
   /** Target database name */
   public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
 
-  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, 
IcebergTable existingTargetIcebergTable, Properties properties, FileSystem 
sourceFs) {
     this.dbName = db;
     this.inputTableName = table;
-    this.icebergTable = icebergTbl;
+    this.srcIcebergTable = srcIcebergTable;
+    this.existingTargetIcebergTable = existingTargetIcebergTable;

Review Comment:
   I have added the Javadoc conveying that we presume/require that the 
destination table exists. I have taken your suggestion to check if it even 
exists before we initiate any replication. In my latest commit, I am checking 
for the table before creating a new `TableOps` for it



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);

Review Comment:
   yes, we do. It was just a part of refactor -> rename wherein it picked up 
src instead of dst



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;

Review Comment:
   I agree. I have kept it consisted in my latest commit replacing 'target' 
with 'destination'



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());

Review Comment:
   I have added them as method args now... so it will refer whatever gets 
passed to it.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());
+    copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0));

Review Comment:
   I agree! I have changed the method definition as your suggestion above :)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, 
Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    IcebergTable existingTargetIcebergTable = 
targetIcebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
existingTargetIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      case TARGET:
+        catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      default:
+        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    }
+    icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    catalogProperties.put(CatalogProperties.URI, catalogUri);

Review Comment:
   I believe since both src and dst will have the same catalog class key... I 
have removed source and kept it generic. Now, both src and dst will have the 
same catalog class key.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 852121)
    Time Spent: 0.5h  (was: 20m)

> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
>                 Key: GOBBLIN-1802
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to