meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1144097069


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make 
parameterizable, if desired
 
-  /** Target metastore URI */
-  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
   /** Target database name */
   public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
 
-  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, 
IcebergTable existingTargetIcebergTable, Properties properties, FileSystem 
sourceFs) {
     this.dbName = db;
     this.inputTableName = table;
-    this.icebergTable = icebergTbl;
+    this.srcIcebergTable = srcIcebergTable;
+    this.existingTargetIcebergTable = existingTargetIcebergTable;

Review Comment:
   I have added the Javadoc conveying that we presume/require that the 
destination table exists. I have taken your suggestion to check if it even 
exists before we initiate any replication. In my latest commit, I am checking 
for the table before creating a new `TableOps` for it



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);

Review Comment:
   yes, we do. It was just a part of refactor -> rename wherein it picked up 
src instead of dst



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;

Review Comment:
   I agree. I have kept it consisted in my latest commit replacing 'target' 
with 'destination'



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());

Review Comment:
   I have added them as method args now... so it will refer whatever gets 
passed to it.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());
+    copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0));

Review Comment:
   I agree! I have changed the method definition as your suggestion above :)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, 
Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    IcebergTable existingTargetIcebergTable = 
targetIcebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
existingTargetIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      case TARGET:
+        catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      default:
+        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    }
+    icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    catalogProperties.put(CatalogProperties.URI, catalogUri);

Review Comment:
   I believe since both src and dst will have the same catalog class key... I 
have removed source and kept it generic. Now, both src and dst will have the 
same catalog class key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to