aokolnychyi edited a comment on pull request #1525:
URL: https://github.com/apache/iceberg/pull/1525#issuecomment-730518594


   I have two big questions:
   - Shall we separate SNAPSHOT and MIGRATE into different classes? They only 
partially overlap and that logic can be factored out into a utility class or a 
base class. It seems it would be simpler if we had two separate classes. 
Snapshot should know nothing about renaming original tables.
   - Shall we use V2 Catalog API wherever possible?
   
   I tried to use V2 API for snapshot and this is what I got:
   
   ```
   public class SnapshotTableAction implements Action<Long> {
   
     private final SparkSession spark;
   
     // source table config
     private final TableCatalog sourceCatalog;
     private final Identifier sourceIdent;
     private final Table sourceTable;
   
     // target table config
     private final StagingTableCatalog targetCatalog;
     private final Identifier targetIdent;
   
     public SnapshotTableAction(SparkSession spark, TableCatalog sourceCatalog, 
Identifier sourceIdent,
                                TableCatalog targetCatalog, Identifier 
targetIdent) {
   
       Preconditions.checkArgument(isIcebergCatalog(targetCatalog), "Target 
catalog is not Iceberg catalog");
   
       this.spark = spark;
       this.sourceCatalog = sourceCatalog;
       this.sourceIdent = sourceIdent;
       this.sourceTable = loadTable(sourceCatalog, sourceIdent);
       this.targetCatalog = (StagingTableCatalog) targetCatalog;
       this.targetIdent = targetIdent;
     }
   
     @Override
     public Long execute() {
       checkSourceTable();
   
       StagedTable targetTable = stageTargetTable();
       org.apache.iceberg.Table targetIcebergTable = 
asIcebergTable(targetTable);
   
       try {
         assignDefaultNameMapping(targetIcebergTable);
   
         // TODO: cast to v1 table until we know how to build SparkPartitions 
ourselves
         TableIdentifier v1Identifier = 
Spark3Util.toTableIdentifier(sourceIdent);
         String stagingLocation = stagingLocation(targetIcebergTable);
         SparkTableUtil.importSparkTable(spark, v1Identifier, 
targetIcebergTable, stagingLocation);
   
         Snapshot snapshot = targetIcebergTable.currentSnapshot();
         String totalDataFilesAsString = 
snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
         return Long.parseLong(totalDataFilesAsString);
       } catch (Exception e) {
         targetCatalog.dropTable(targetIdent);
         throw new RuntimeException("Error msg", e);
       }
     }
   
     private boolean isIcebergCatalog(TableCatalog catalog) {
       return catalog instanceof SparkSessionCatalog || catalog instanceof 
SparkCatalog;
     }
   
     private Table loadTable(TableCatalog catalog, Identifier ident) {
       try {
         return catalog.loadTable(ident);
       } catch (NoSuchTableException e) {
         throw new IllegalArgumentException("Cannot load table", e);
       }
     }
   
     private void checkSourceTable() {
       // TODO: check provider and other configs (can cast to V1Table if not 
enough info in properties)
     }
   
     private StagedTable stageTargetTable() {
       try {
         Map<String, String> props = targetTableProps();
         StructType schema = sourceTable.schema();
         Transform[] partitioning = sourceTable.partitioning();
         return targetCatalog.stageCreate(targetIdent, schema, partitioning, 
props);
       } catch (NoSuchNamespaceException e) {
         throw new IllegalArgumentException("Cannot create a new table in a 
namespace which does not exist", e);
       } catch (TableAlreadyExistsException e) {
         throw new IllegalArgumentException("Destination table already exists", 
e);
       }
     }
   
     private Map<String, String> targetTableProps() {
       // TODO: prepare new properties based on the source table, override 
location
       return null;
     }
   
     private String stagingLocation(org.apache.iceberg.Table table) {
       // TODO: return the metadata location
       return null;
     }
   
     private void assignDefaultNameMapping(org.apache.iceberg.Table table) {
       // TODO: assign default name mapping
     }
   
     private org.apache.iceberg.Table asIcebergTable(Table table) {
       return ((SparkTable) table).table();
     }
   }
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to