[ 
https://issues.apache.org/jira/browse/GOBBLIN-1802?focusedWorklogId=852360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-852360
 ]

ASF GitHub Bot logged work on GOBBLIN-1802:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Mar/23 22:05
            Start Date: 22/Mar/23 22:05
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1145335584


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingDestinationIcebergTable;

Review Comment:
   nit: mostly because you're generating `@Getter`s... shall we call it merely 
`destIcebergTable` or `destinationIcebergTable`?
   
   we'd only need to keep as `getExistingDestinationIcebergTable()` if there 
were another different kind of dest table, also a field of this same class.
   
   definitely let's javadoc the fact that it should already exist!--but no need 
to force everyone using our accessors to repeat that expectation back to us on 
every reference.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -74,18 +84,15 @@ public List<IcebergDataset> findDatasets() throws 
IOException {
     String dbName = properties.getProperty(ICEBERG_DB_NAME);
     String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
 
-    try {
-      IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
-      /* Each Iceberg dataset maps to an Iceberg table
-       * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
-       */
-      matchingDatasets.add(createIcebergDataset(dbName, tblName, 
icebergCatalog, properties, sourceFs));
-      log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),
-          matchingDatasets, dbName, tblName);
-      return matchingDatasets;
-    } catch (ReflectiveOperationException exception) {
-      throw new IOException(exception);
-    }
+    IcebergCatalog sourceIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
+    IcebergCatalog destinationIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+    /* Each Iceberg dataset maps to an Iceberg table
+     * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
+     */
+    matchingDatasets.add(createIcebergDataset(dbName, tblName, 
sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
+    log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),

Review Comment:
   hadn't recalled seeing before, but on looking now...
   
   does the `.size()` provide anything useful?  when would it ever be `!= 1`?  
e.g. when an error, won't we throw `IOException`, rather returning with an 
empty list?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.TableMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+
+/**
+ * {@link CommitStep} to perform Iceberg registration.
+ */
+
+@Slf4j
+@AllArgsConstructor
+public class IcebergRegisterStep implements CommitStep {
+
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingDestinationIcebergTable;
+
+  @Override
+  public boolean isCompleted() throws IOException {
+    return false;
+  }
+
+  @Override
+  public void execute() throws IOException {
+    TableMetadata destinationMetadata = null;
+    try {
+      destinationMetadata = 
this.existingDestinationIcebergTable.accessTableMetadata();
+    } catch (IcebergTable.TableNotFoundException tnfe) {
+      log.warn("Destination TableMetadata doesn't exist because : {}" , tnfe);

Review Comment:
   since this will call the `warn(String, Throwable)` verision, not the 
`warn(String, Object)` one, I doubt the `{}` should be present



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,7 +153,9 @@ Collection<CopyEntity> generateCopyEntities(FileSystem 
targetFs, CopyConfigurati
       fileEntity.setSourceData(getSourceDataset(this.sourceFs));
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
       copyEntities.add(fileEntity);
+
     }
+    copyEntities.add(addPostPublishStep(this.srcIcebergTable, 
this.existingDestinationIcebergTable));

Review Comment:
   `createPostPublishStep`, since it adds nothing?



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -419,14 +437,21 @@ private static void 
verifyAncestorPermissions(List<CopyEntityDeserializer.FileOw
     }
   }
 
+  private static void verifyPostPublishStep(String json) {
+    String expectedCommitStep = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+    String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
+        
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
+    Assert.assertEquals(actualCommitStep, expectedCommitStep);
+  }
+
   /**
    *  Sadly, this is needed to avoid losing `FileSystem` mock to replacement 
from the `FileSystem.get` `static`
    *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
-    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, Properties properties,
+    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, IcebergTable targetIcebergTbl, Properties properties,

Review Comment:
   nit: target => dest (for consistency w/ rest of this PR)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -25,34 +25,44 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.util.HadoopUtils;
 
+
 /**
  * Finds {@link IcebergDataset}s. Will look for tables in a database using a 
{@link IcebergCatalog},
  * and creates a {@link IcebergDataset} for each one.
  */
 @Slf4j
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
-
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
   public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
-  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
-  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
+  public static final String ICEBERG_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";

Review Comment:
   first off, the java identifier no longer says `SRC`, but the property used 
by users still contains (`.source`)...
   
   anyway, the presumption of a common catalog class for both source and dest 
seems too inflexible in this OSS code aspiring to be agnostic to user reqs.  
thus let's have a separate prop for each.
   
   your choice, but if you really do wish to support the convenience of only 
one prop to set, then have three props - src, dest, and "unspecified", as a 
fallback.  adopt the semantics that src or dest may be omitted, but when either 
is skipped, then "unspecified" becomes non-optional and gets used as the 
fallback.
   
   by "unspecified", I mean:
   ```ICEBERG_DATASET_PREFIX + ".catalog.class"```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java:
##########
@@ -51,7 +54,10 @@ public String getCatalogUri() {
   }
 
   @Override
-  protected TableOperations createTableOperations(TableIdentifier tableId) {
+  protected TableOperations createTableOperations(TableIdentifier tableId) 
throws IOException {
+    if (!hc.tableExists(tableId)) {
+      throw new IcebergTable.TableNotFoundException(tableId);
+    }

Review Comment:
   since the underlying `iceberg.Catalog` abstraction allows for creating a 
table that doesn't (yet) exist we probably don't wish to prohibit that here.  a 
better alternative would be to define an explicit predicate for a caller to 
invoke that pre-verifies the table's existence.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 852360)
    Time Spent: 50m  (was: 40m)

> Register iceberg table metadata update with destination side catalog
> --------------------------------------------------------------------
>
>                 Key: GOBBLIN-1802
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1802
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to