[ 
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=851438&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-851438
 ]

ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Mar/23 01:10
            Start Date: 17/Mar/23 01:10
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1139294562


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make 
parameterizable, if desired
 
-  /** Target metastore URI */
-  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
   /** Target database name */
   public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
 
-  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, 
IcebergTable existingTargetIcebergTable, Properties properties, FileSystem 
sourceFs) {
     this.dbName = db;
     this.inputTableName = table;
-    this.icebergTable = icebergTbl;
+    this.srcIcebergTable = srcIcebergTable;
+    this.existingTargetIcebergTable = existingTargetIcebergTable;

Review Comment:
   seems we *presume/require* that target table exists.  should we clarify this 
in javadoc and also throw IllegalArgEx if we get `null` here?  should we go 
even further than non-null to ensure it 'exists' in whatever sense this name 
implies (e.g. is initialized)?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);

Review Comment:
   don't we want dest here?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, 
Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    IcebergTable existingTargetIcebergTable = 
targetIcebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
existingTargetIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");

Review Comment:
   cleaner and more encapsulated might be to add a method on the enum (class) 
that checks for the property.  for this, you'd initialize each enum value w/ 
its associated URI key.  hence you might say:
   ```
   catalogProperties.put(I_C_K, location.getCatalogUri(properties))
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -98,6 +101,10 @@ public class IcebergDatasetTest {
       new MockIcebergTable.SnapshotPaths(Optional.empty(), 
MANIFEST_LIST_PATH_1, Arrays.asList(
           new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1,
               Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B))));
+  private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_2 =
+      new MockIcebergTable.SnapshotPaths(Optional.empty(), Strings.EMPTY, 
Arrays.asList(
+          new IcebergSnapshotInfo.ManifestFileInfo(Strings.EMPTY,
+              Arrays.asList(Strings.EMPTY))));

Review Comment:
   curious what you're setting up here... wouldn't empty strings for paths be 
garbage input?
   
   I'm puzzled because the purpose doesn't seem to be to use garbage values to 
execute a particular code path...



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, 
Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    IcebergTable existingTargetIcebergTable = 
targetIcebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
existingTargetIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      case TARGET:
+        catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service 
URI is required");
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      default:
+        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    }
+    icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    catalogProperties.put(CatalogProperties.URI, catalogUri);

Review Comment:
   any reason not to allow the destination catalog class to differ from the 
source's?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());
+    copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0));

Review Comment:
   far better practice since easier to debug, test, and maintain in general is 
not to mutate a collection inside a method but return a value and then have the 
caller add to the collection they 'own'.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() 
throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, 
Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    IcebergTable existingTargetIcebergTable = 
targetIcebergCatalog.openTable(dbName, tblName);

Review Comment:
   IIRC opening the table may not verify existence.  if existence is mandatory 
(which is understandable), we'd prefer to detect up front rather than fail at 
the very end.  let's investigate whether there's a good way to pre-check 
existence *on the destination* before beginning what could be a lengthy copy.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;

Review Comment:
   let's adopt consistent terminology.  I'd suggest source and dest, since:
   * our impl uses a class called `SourceAndDestination`
   * we already have a method called `getDestinationDataset` below
   * when code-searching the FG, 'destination' is far more prevalent than 
'target'
   
   let me know if you have counter-examples in mind, but let's converge on the 
more common term.  feel free to confirm which one that is



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem 
getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.getSrcIcebergTable(), 
this.getExistingTargetIcebergTable());

Review Comment:
   nit: why use accessors rather than `this.srcIcebergTable`, as you used above?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -25,34 +25,44 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.util.HadoopUtils;
 
+
 /**
  * Finds {@link IcebergDataset}s. Will look for tables in a database using a 
{@link IcebergCatalog},
  * and creates a {@link IcebergDataset} for each one.
  */
 @Slf4j
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
-
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
   public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
-  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
   public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
   public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
   public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX 
+ ".source.cluster.name";
+  public static final String ICEBERG_TARGET_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
+  public static final String ICEBERG_TARGET_CLUSTER_NAME = 
ICEBERG_DATASET_PREFIX + ".target.cluster.name";
+  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
+  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
+
+  public enum CatalogLocation {
+    SOURCE,
+    TARGET

Review Comment:
   if we decide destination is term to go with, please update here too



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.TableMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+
+/**
+ * {@link CommitStep} to perform Iceberg registration.
+ */
+
+@Slf4j
+@AllArgsConstructor
+public class IcebergRegisterStep implements CommitStep {
+
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;
+
+  @Override
+  public boolean isCompleted() throws IOException {
+    return false;
+  }
+
+  @Override
+  public void execute() throws IOException {
+    TableMetadata targetMetadata = null;
+    try {
+      targetMetadata = this.existingTargetIcebergTable.accessTableMetadata();
+    } catch (IcebergTable.TableNotFoundException tnfe) {
+      log.warn("Target TableMetadata doesn't exist because : {}" , tnfe);
+    }
+    
this.srcIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(),
 targetMetadata);

Review Comment:
   are we sure this source table metadata will match what distcp just finished 
moving over to the destination FS?  what if snapshots were committed to the 
source side between the time we planned the WorkUnits and this commit is 
finally reached?
   
   perhaps the possibility of race condition is precluded by code elsewhere... 
if so, great!  most important is a comment here explaining what guarantees this 
to be safe.



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -382,23 +402,36 @@ private static void 
verifyCopyEntities(Collection<CopyEntity> copyEntities, List
     List<String> actual = new ArrayList<>();
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
-      String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
-      actual.add(filepath);
+      if (isCopyableFile(json)) {
+        String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
+        actual.add(filepath);
+      }
     }
     Assert.assertEquals(actual.size(), expected.size(), "Set" + 
actual.toString() + " vs Set" + expected.toString());
     Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
   }
 
+  private static boolean isCopyableFile(String json) {

Review Comment:
   I see why you'd need this to filter out the commit step...
   
   also apologies if I'm just not spotting it... but where is the content of 
that commit step verified among these tests?



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -253,15 +263,19 @@ public void 
testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept
     FileSystem sourceFs = sourceBuilder.build();
 
     IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
+    IcebergTable targetTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2));
     IcebergDataset icebergDataset =
-        new TrickIcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
+        new TrickIcebergDataset(testDbName, testTblName, icebergTable, 
targetTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
 
     CopyConfiguration copyConfiguration =
         CopyConfiguration.builder(destFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
             .copyContext(new CopyContext()).build();
+    try (MockedConstruction<PostPublishStep> mockedPostPublishStep = 
mockConstruction(PostPublishStep.class)) {
+      PostPublishStep step = new 
PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new 
IcebergRegisterStep(icebergTable, targetTable), 0);

Review Comment:
   I'm not totally understanding here... what is the purpose of the mocked 
object you created?  shouldn't `step` be used somewhere, if we're to test 
anything?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 851438)
    Time Spent: 20m  (was: 10m)

> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
>                 Key: GOBBLIN-1802
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to