phet commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1145335584


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingDestinationIcebergTable;

Review Comment:
   nit: mostly because you're generating `@Getter`s... shall we call it merely 
`destIcebergTable` or `destinationIcebergTable`?
   
   we'd only need to keep as `getExistingDestinationIcebergTable()` if there 
were another different kind of dest table, also a field of this same class.
   
   definitely let's javadoc the fact that it should already exist!--but no need 
to force everyone using our accessors to repeat that expectation back to us on 
every reference.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -74,18 +84,15 @@ public List<IcebergDataset> findDatasets() throws 
IOException {
     String dbName = properties.getProperty(ICEBERG_DB_NAME);
     String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
 
-    try {
-      IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
-      /* Each Iceberg dataset maps to an Iceberg table
-       * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
-       */
-      matchingDatasets.add(createIcebergDataset(dbName, tblName, 
icebergCatalog, properties, sourceFs));
-      log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),
-          matchingDatasets, dbName, tblName);
-      return matchingDatasets;
-    } catch (ReflectiveOperationException exception) {
-      throw new IOException(exception);
-    }
+    IcebergCatalog sourceIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
+    IcebergCatalog destinationIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+    /* Each Iceberg dataset maps to an Iceberg table
+     * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
+     */
+    matchingDatasets.add(createIcebergDataset(dbName, tblName, 
sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
+    log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),

Review Comment:
   hadn't recalled seeing before, but on looking now...
   
   does the `.size()` provide anything useful?  when would it ever be `!= 1`?  
e.g. when an error, won't we throw `IOException`, rather returning with an 
empty list?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.TableMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+
+/**
+ * {@link CommitStep} to perform Iceberg registration.
+ */
+
+@Slf4j
+@AllArgsConstructor
+public class IcebergRegisterStep implements CommitStep {
+
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingDestinationIcebergTable;
+
+  @Override
+  public boolean isCompleted() throws IOException {
+    return false;
+  }
+
+  @Override
+  public void execute() throws IOException {
+    TableMetadata destinationMetadata = null;
+    try {
+      destinationMetadata = 
this.existingDestinationIcebergTable.accessTableMetadata();
+    } catch (IcebergTable.TableNotFoundException tnfe) {
+      log.warn("Destination TableMetadata doesn't exist because : {}" , tnfe);

Review Comment:
   since this will call the `warn(String, Throwable)` verision, not the 
`warn(String, Object)` one, I doubt the `{}` should be present



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,7 +153,9 @@ Collection<CopyEntity> generateCopyEntities(FileSystem 
targetFs, CopyConfigurati
       fileEntity.setSourceData(getSourceDataset(this.sourceFs));
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
       copyEntities.add(fileEntity);
+
     }
+    copyEntities.add(addPostPublishStep(this.srcIcebergTable, 
this.existingDestinationIcebergTable));

Review Comment:
   `createPostPublishStep`, since it adds nothing?



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -419,14 +437,21 @@ private static void 
verifyAncestorPermissions(List<CopyEntityDeserializer.FileOw
     }
   }
 
+  private static void verifyPostPublishStep(String json) {
+    String expectedCommitStep = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+    String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
+        
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
+    Assert.assertEquals(actualCommitStep, expectedCommitStep);
+  }
+
   /**
    *  Sadly, this is needed to avoid losing `FileSystem` mock to replacement 
from the `FileSystem.get` `static`
    *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
-    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, Properties properties,
+    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, IcebergTable targetIcebergTbl, Properties properties,

Review Comment:
   nit: target => dest (for consistency w/ rest of this PR)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -25,34 +25,44 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.util.HadoopUtils;
 
+
 /**
  * Finds {@link IcebergDataset}s. Will look for tables in a database using a 
{@link IcebergCatalog},
  * and creates a {@link IcebergDataset} for each one.
  */
 @Slf4j
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
-
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
   public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
-  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
-  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
+  public static final String ICEBERG_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";

Review Comment:
   first off, the java identifier no longer says `SRC`, but the property used 
by users still contains (`.source`)...
   
   anyway, the presumption of a common catalog class for both source and dest 
seems too inflexible in this OSS code aspiring to be agnostic to user reqs.  
thus let's have a separate prop for each.
   
   your choice, but if you really do wish to support the convenience of only 
one prop to set, then have three props - src, dest, and "unspecified", as a 
fallback.  adopt the semantics that src or dest may be omitted, but when either 
is skipped, then "unspecified" becomes non-optional and gets used as the 
fallback.
   
   by "unspecified", I mean:
   ```ICEBERG_DATASET_PREFIX + ".catalog.class"```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java:
##########
@@ -51,7 +54,10 @@ public String getCatalogUri() {
   }
 
   @Override
-  protected TableOperations createTableOperations(TableIdentifier tableId) {
+  protected TableOperations createTableOperations(TableIdentifier tableId) 
throws IOException {
+    if (!hc.tableExists(tableId)) {
+      throw new IcebergTable.TableNotFoundException(tableId);
+    }

Review Comment:
   since the underlying `iceberg.Catalog` abstraction allows for creating a 
table that doesn't (yet) exist we probably don't wish to prohibit that here.  a 
better alternative would be to define an explicit predicate for a caller to 
invoke that pre-verifies the table's existence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to