[ 
https://issues.apache.org/jira/browse/GOBBLIN-1709?focusedWorklogId=809626&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-809626
 ]

ASF GitHub Bot logged work on GOBBLIN-1709:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Sep/22 18:59
            Start Date: 16/Sep/22 18:59
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3560:
URL: https://github.com/apache/gobblin/pull/3560#discussion_r973289743


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",
+          fileEntity.getFileStatus().getPath(), 
fileEntity.getDestination().toString());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, 
FileStatus> paths, CopyConfiguration configuration) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), 
this.fs.makeQualified(entry.getKey())));
+    }
+
+    for(SourceAndDestination sourceAndDestination : dataFiles) {

Review Comment:
   formatting off--need space



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",
+          fileEntity.getFileStatus().getPath(), 
fileEntity.getDestination().toString());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.

Review Comment:
   where's the `StorageDescriptor` in `Map<Path, FileStatus>`?  is this comment 
correct?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",

Review Comment:
   if we really think this level of debugging is necessary (I'm currently an 
even 50-50 split on whether or not...) we could keep it as a `.debug` level, 
with the idea that it gets selectively enabled, case-by-case



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;

Review Comment:
   are any of these expected to change or can they be `final`?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",

Review Comment:
   this feels way too noisy.  for reference for each hive daily tracking 
partition we have > 3600 files!  this means that copying 3 partitions would log 
over 10k lines (once their table management eventually moves from hive to 
iceberg).



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();

Review Comment:
   nit: map of paths to what?  clearer would be `pathToFileStatus`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableFileSet.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.partition.FileSet;
+
+
+/**
+ * A {@link IcebergTableFileSet} that generates {@link CopyEntity}s for a Hive 
Catalog based Iceberg table

Review Comment:
   I see nothing related to hive catalogs in this impl.  In fact, I see nothing 
related to any kind of iceberg catalog!  what felt important to capture here?  
IMO it risks misleading...
   
   also, no need to repeat the class's own name at the beginning of its javadoc.
   
   let's strike this sentence in favor of the one just below



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",
+          fileEntity.getFileStatus().getPath(), 
fileEntity.getDestination().toString());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, 
FileStatus> paths, CopyConfiguration configuration) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), 
this.fs.makeQualified(entry.getKey())));
+    }
+
+    for(SourceAndDestination sourceAndDestination : dataFiles) {
+      actualSourceFs = 
sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);
+
+      // TODO Add ancestor owner and permissions in future releases
+      builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, 
sourceAndDestination.getSource(),
+          sourceAndDestination.getDestination(), configuration));
+    }
+    return builders;
+  }
+  /**
+   * Finds all files read by the Iceberg table including metadata json file, 
manifest files, nested manifest file paths and actual data files.
+   * Returns a map of path, file status for each file that needs to be copied
+   */
+  protected Map<Path, FileStatus> getFilePaths() throws IOException {

Review Comment:
   not: `getFilePathsToFileStatus`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",
+          fileEntity.getFileStatus().getPath(), 
fileEntity.getDestination().toString());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, 
FileStatus> paths, CopyConfiguration configuration) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), 
this.fs.makeQualified(entry.getKey())));
+    }
+
+    for(SourceAndDestination sourceAndDestination : dataFiles) {
+      actualSourceFs = 
sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);
+
+      // TODO Add ancestor owner and permissions in future releases
+      builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, 
sourceAndDestination.getSource(),
+          sourceAndDestination.getDestination(), configuration));
+    }
+    return builders;
+  }
+  /**
+   * Finds all files read by the Iceberg table including metadata json file, 
manifest files, nested manifest file paths and actual data files.
+   * Returns a map of path, file status for each file that needs to be copied
+   */
+  protected Map<Path, FileStatus> getFilePaths() throws IOException {
+    Map<Path, FileStatus> result = Maps.newHashMap();
+    IcebergTable icebergTable = this.getIcebergTable();
+    IcebergSnapshotInfo icebergSnapshotInfo = 
icebergTable.getCurrentSnapshotInfo();
+
+    log.info("Fetching all file paths for the current snapshot id: {} of the 
Iceberg table with metadata path: {}",

Review Comment:
   I like the spirit of the logging, but see room for improvement:
   a. be less conversational and more direct, even "mechanical"
   b. that's because we want to grep these or otherwise machine-process log 
lines.  hence, strings, especially paths, deserve delimiters to help us detect 
whitespace (here I used `'`).
   c. most of all, include all relevant context in the same message (here, the 
FQ table ID)
   ```
   log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, 
tableName, snapId(), mdPath())
   ```



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.gobblin.util.HadoopUtils;
+
+/**
+ * Finds {@link IcebergDataset}s. Will look for tables in a database using a 
{@link IcebergCatalog},
+ * and creates a {@link IcebergDataset} for each one.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
+
+  public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
+  public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
+  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
+
+  private String dbName;
+  private String tblName;
+  private final Properties properties;
+  protected final FileSystem fs;
+
+  /**
+   * Finds all {@link IcebergDataset}s in the file system using the Iceberg 
Catalog.
+   * @return List of {@link IcebergDataset}s in the file system.
+   * @throws IOException
+   */
+  @Override
+  public List<IcebergDataset> findDatasets() throws IOException {
+    List<IcebergDataset> matchingDatasets = new ArrayList<>();
+    /*
+     * Both Iceberg database name and table name are mandatory based on 
current implementation.

Review Comment:
   better to identify which props are mandatory in the method javadoc, rather 
than buried here.  we aspire for users to learn what they need to know by 
reading docs, not hunting around in our impl



##########
gobblin-data-management/src/test/java/org/apache/gobblin/runtime/embedded/EmbeddedGobblinDistcpTest.java:
##########
@@ -386,4 +386,4 @@ public Set<Characteristic> applicableCharacteristics() {
     }
   }
 
-}
+}

Review Comment:
   it's still showing up, even now.  let's just remove this file from the 
commit, since it appears you have nothing to change here anyway



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());
+
+
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      log.info("Adding file path: {} to the collection of copy entities and 
copying to destination: {}",
+          fileEntity.getFileStatus().getPath(), 
fileEntity.getDestination().toString());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, 
FileStatus> paths, CopyConfiguration configuration) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), 
this.fs.makeQualified(entry.getKey())));
+    }
+
+    for(SourceAndDestination sourceAndDestination : dataFiles) {
+      actualSourceFs = 
sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);
+
+      // TODO Add ancestor owner and permissions in future releases
+      builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, 
sourceAndDestination.getSource(),
+          sourceAndDestination.getDestination(), configuration));
+    }
+    return builders;
+  }
+  /**
+   * Finds all files read by the Iceberg table including metadata json file, 
manifest files, nested manifest file paths and actual data files.
+   * Returns a map of path, file status for each file that needs to be copied

Review Comment:
   let's think ahead to delta calculation... this should be all paths that 
exist in the snapshot.  the caller will determine what actually "needs to be 
copied"



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());

Review Comment:
   if the intent of logging here is to contrast the number of possible paths 
with the number we determine actually need copying given the delta between 
source and dest, that's a good call.  if so leave a line here, just more 
information-dense:
   ```
   log.info("{}.{} - found {} candidate source paths", dbName, tableName, 
mapOfPathsToCopy.size())



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+    log.info("Found {} to be copied over to the destination", 
mapOfPathsToCopy.size());

Review Comment:
   what do you feel this line gives (here specifically)?
   
   more typical would be to log the count of `CopyEntity` at the end, just 
before returning, around line 149.
   
   as always, context is paramount to logging!  in this situation, include the 
db and table name.
   
   finally try to be more terse, less conversational, e.g.:
   ```
   log.info("{}.{} - generated {} copy entities", dbName, tableName, 
copyEntities.size())
   ```



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.api.client.util.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+
+
+public class
+IcebergDatasetTest {
+
+  static final String METADATA_PATH = "/root/iceberg/test/metadata";
+  static final String MANIFEST_PATH = 
"/root/iceberg/test/metadata/test_manifest";
+  static final String MANIFEST_LIST_PATH = 
"/root/iceberg/test/metadata/test_manifest/data";
+  static final String MANIFEST_FILE_PATH1 = 
"/root/iceberg/test/metadata/test_manifest/data/a";
+  static final String MANIFEST_FILE_PATH2 = 
"/root/iceberg/test/metadata/test_manifest/data/b";
+
+  @Test
+  public void testGetFilePaths() throws IOException {
+
+    List<String> pathsToCopy = new ArrayList<>();
+    pathsToCopy.add(MANIFEST_FILE_PATH1);
+    pathsToCopy.add(MANIFEST_FILE_PATH2);
+    Map<Path, FileStatus> expected = Maps.newHashMap();
+    expected.put(new Path(MANIFEST_FILE_PATH1), null);
+    expected.put(new Path(MANIFEST_FILE_PATH2), null);
+
+    IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    IcebergSnapshotInfo icebergSnapshotInfo = 
Mockito.mock(IcebergSnapshotInfo.class);
+
+    
Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+    Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+    IcebergDataset icebergDataset = new IcebergDataset("test_db_name", 
"test_tbl_name", icebergTable, new Properties(), fs);
+
+    Map<Path, FileStatus> actual = icebergDataset.getFilePaths();
+    Assert.assertEquals(actual, expected);
+  }
+
+  /**
+   * Test case to generate copy entities for all the file paths for a mocked 
iceberg table.
+   * The assumption here is that we create copy entities for all the matching 
file paths,
+   * without calculating any difference between the source and destination
+   */
+  @Test
+  public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException, 
URISyntaxException {
+
+    FileSystem fs = Mockito.mock(FileSystem.class);
+    String test_db_name = "test_db_name";
+    String test_table_name = "test_tbl_name";
+    String test_qualified_path = 
"/root/iceberg/test/destination/sub_path_destination";
+    String test_uri_path = "/root/iceberg/test/output";
+    Properties properties = new Properties();
+    properties.setProperty("data.publisher.final.dir", "/test");
+    List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH, 
MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+
+    CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, 
properties)
+        .preserve(PreserveAttributes.fromMnemonicString(""))
+        .copyContext(new CopyContext())
+        .build();
+
+    List<String> listedManifestFilePaths = Arrays.asList(MANIFEST_FILE_PATH1, 
MANIFEST_FILE_PATH2);
+    IcebergSnapshotInfo.ManifestFileInfo manifestFileInfo = new 
IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_LIST_PATH, 
listedManifestFilePaths);
+    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles = 
Arrays.asList(manifestFileInfo);
+    IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH, 
MANIFEST_PATH, manifestFiles);
+    IcebergDataset icebergDataset = new IcebergDataset(test_db_name, 
test_table_name, icebergTable, new Properties(), fs);
+    MockDestinationFileSystem mockDestinationFileSystem = new 
MockDestinationFileSystem();
+    mockDestinationFileSystem.addPath(METADATA_PATH);
+    mockDestinationFileSystem.addPath(MANIFEST_PATH);
+    mockDestinationFileSystem.addPath(MANIFEST_LIST_PATH);
+    mockDestinationFileSystem.addPath(MANIFEST_FILE_PATH1);
+    mockDestinationFileSystem.addPath(MANIFEST_FILE_PATH2);
+
+    mockFileSystemMethodCalls(fs, mockDestinationFileSystem.pathToFileStatus, 
test_qualified_path, test_uri_path);
+
+    Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(copyConfiguration);
+    verifyCopyEntities(copyEntities, expected);
+
+  }
+
+  private void verifyCopyEntities(Collection<CopyEntity> copyEntities, 
List<String> expected) {
+    List<String> actual = new ArrayList<>();
+    for (CopyEntity copyEntity : copyEntities) {
+      String json = copyEntity.toString();
+      JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
+      JsonObject objectData =
+          
jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
+      JsonObject pathObject = 
objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
+      String filepath = 
pathObject.getAsJsonPrimitive("object-data").getAsString();
+      actual.add(filepath);
+    }
+    Assert.assertEquals(actual.size(), expected.size());
+    Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
+  }
+
+  private void mockFileSystemMethodCalls(FileSystem fs, Map<Path, FileStatus> 
pathToFileStatus, String qualifiedPath, String uriPath)
+      throws URISyntaxException, IOException {
+
+    Mockito.when(fs.getUri()).thenReturn(new URI(null, null, uriPath, null));
+    for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
+      Path path = entry.getKey();
+      FileStatus fileStatus = entry.getValue();
+      Mockito.when(fs.getFileStatus(path)).thenReturn(fileStatus);
+      Mockito.when(fs.makeQualified(path)).thenReturn(new Path(qualifiedPath));
+    }
+  }
+
+  private static class MockedIcebergTable extends IcebergTable {
+
+    String metadataPath;
+    String manifestListPath;
+    List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+
+    public MockedIcebergTable(String metadataPath, String manifestListPath, 
List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles) {
+      super(null);
+      this.metadataPath = metadataPath;
+      this.manifestListPath = manifestListPath;
+      this.manifestFiles = manifestFiles;
+    }
+
+    @Override
+    public IcebergSnapshotInfo getCurrentSnapshotInfo() {
+      Long snapshotId = 0L;
+      Instant timestamp  = Instant.ofEpochMilli(0L);
+      return new IcebergSnapshotInfo(snapshotId, timestamp, metadataPath, 
manifestListPath, manifestFiles);
+    }
+  }
+
+  private static class MockDestinationFileSystem {
+    Map<Path, FileStatus> pathToFileStatus;

Review Comment:
   this isn't actually itself a mock, so the name is misleading.  while you 
could encapsulate the entirety of the mocking into this class, it's also fine 
not to...
   
   when you don't go all the way, please just name it appropriately.  e.g. 
`DestinationFileSystemPaths`.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iceberg dataset implementing {@link CopyableDataset}.
+ */
+@Slf4j
+@Getter
+public class IcebergDataset implements PrioritizedCopyableDataset {
+  private final String dbName;
+  private final String inputTableName;
+  private IcebergTable icebergTable;
+  protected Properties properties;
+  protected FileSystem fs;
+
+  private Optional<String> sourceMetastoreURI;
+  private Optional<String> targetMetastoreURI;
+
+  /** Target metastore URI */
+  public static final String TARGET_METASTORE_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".copy.target.metastore.uri";
+  /** Target database name */
+  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+
+  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem fs) {
+    this.dbName = db;
+    this.inputTableName = table;
+    this.icebergTable = icebergTbl;
+    this.properties = properties;
+    this.fs = fs;
+    this.sourceMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
+    this.targetMetastoreURI =
+        
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+  }
+
+  public IcebergDataset(String db, String table) {
+    this.dbName = db;
+    this.inputTableName = table;
+  }
+
+  /**
+   * Represents a source {@link FileStatus} and a {@link Path} destination.
+   */
+  @Data
+  private static class SourceAndDestination {
+    private final FileStatus source;
+    private final Path destination;
+  }
+
+  @Override
+  public String datasetURN() {
+    // TODO: verify!
+    return this.dbName + "." + this.inputTableName;
+  }
+
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
+    return getCopyEntities(configuration);
+  }
+  /**
+   * Finds all files read by the table and generates CopyableFiles.
+   * For the specific semantics see {@link #getCopyEntities}.
+   */
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration,
+      Comparator<FileSet<CopyEntity>> prioritizer, 
PushDownRequestor<FileSet<CopyEntity>> requestor) {
+    // TODO: Implement PushDownRequestor and priority based copy entity 
iteration
+    return getCopyEntities(configuration);
+  }
+
+  /**
+   * Finds all files read by the table and generates {@link CopyEntity}s for 
duplicating the table.
+   */
+  Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) {
+    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, configuration);
+    return Iterators.singletonIterator(fileSet);  }
+
+  /**
+   * Finds all files read by the table file set and generates {@link 
CopyEntity}s for duplicating the table.
+   */
+  @VisibleForTesting
+  Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) 
throws IOException {
+    String fileSet = this.getInputTableName();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    log.info("Fetching all the files to be copied");
+    Map<Path, FileStatus> mapOfPathsToCopy = getFilePaths();
+
+    log.info("Fetching copyable file builders from their respective file sets 
and adding to collection of copy entities");
+    for (CopyableFile.Builder builder : 
getCopyableFilesFromPaths(mapOfPathsToCopy, configuration)) {
+      CopyableFile fileEntity =
+          
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
+      fileEntity.setSourceData(getSourceDataset());
+      fileEntity.setDestinationData(getDestinationDataset());
+      copyEntities.add(fileEntity);
+    }
+    return copyEntities;
+  }
+
+  /**
+   * Get builders for a {@link CopyableFile} for each file referred to by a 
{@link org.apache.hadoop.hive.metastore.api.StorageDescriptor}.
+   */
+  protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, 
FileStatus> paths, CopyConfiguration configuration) throws IOException {
+
+    List<CopyableFile.Builder> builders = Lists.newArrayList();
+    List<SourceAndDestination> dataFiles = Lists.newArrayList();
+    Configuration hadoopConfiguration = new Configuration();
+    FileSystem actualSourceFs;
+
+    for(Map.Entry<Path, FileStatus> entry : paths.entrySet()) {
+      dataFiles.add(new SourceAndDestination(entry.getValue(), 
this.fs.makeQualified(entry.getKey())));
+    }
+
+    for(SourceAndDestination sourceAndDestination : dataFiles) {
+      actualSourceFs = 
sourceAndDestination.getSource().getPath().getFileSystem(hadoopConfiguration);
+
+      // TODO Add ancestor owner and permissions in future releases
+      builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, 
sourceAndDestination.getSource(),
+          sourceAndDestination.getDestination(), configuration));
+    }
+    return builders;
+  }
+  /**
+   * Finds all files read by the Iceberg table including metadata json file, 
manifest files, nested manifest file paths and actual data files.
+   * Returns a map of path, file status for each file that needs to be copied
+   */
+  protected Map<Path, FileStatus> getFilePaths() throws IOException {
+    Map<Path, FileStatus> result = Maps.newHashMap();
+    IcebergTable icebergTable = this.getIcebergTable();
+    IcebergSnapshotInfo icebergSnapshotInfo = 
icebergTable.getCurrentSnapshotInfo();
+
+    log.info("Fetching all file paths for the current snapshot of the Iceberg 
table");
+    List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
+
+    for(String pathString : pathsToCopy) {
+      Path path = new Path(pathString);
+      result.put(path, this.fs.getFileStatus(path));

Review Comment:
   perhaps will is asking whether a single FNFE or similar should unwind the 
entire stack and fail us overall, or whether some could be tolerated and we 
continue on.
   
   there's probably a combo, where some individual failures should fail 
overall, but others are tolerable--or would even lead us back to reread a fresh 
`IcebergSnapshotInfo`. we'll refine as we go, so it's ok as you say currently 
to fail overall for 100% of the cases.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 809626)
    Time Spent: 7.5h  (was: 7h 20m)

> Create work units for Hive Catalog based Iceberg Datasets to support Distcp 
> for Iceberg
> ---------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1709
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1709
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: distcp-ng
>            Reporter: Meeth Gala
>            Assignee: Issac Buenrostro
>            Priority: Major
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> We want to support Distcp for Iceberg based datasets. 
> As a pilot, we are starting with Hive Catalog and will expand the 
> functionality to cover all Iceberg based datasets.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to