aokolnychyi edited a comment on pull request #1525:
URL: https://github.com/apache/iceberg/pull/1525#issuecomment-730518594
I have two big questions:
- Shall we separate SNAPSHOT and MIGRATE into different classes? They only
partially overlap and that logic can be factored out into a utility class or a
base class. It seems it would be simpler if we had two separate classes.
Snapshot should know nothing about renaming original tables.
- Shall we use V2 Catalog API wherever possible?
I tried to use V2 API for snapshot and this is what I got:
```
public class SnapshotTableAction implements Action<Long> {
private final SparkSession spark;
// source table config
private final TableCatalog sourceCatalog;
private final Identifier sourceIdent;
private final Table sourceTable;
// target table config
private final StagingTableCatalog targetCatalog;
private final Identifier targetIdent;
public SnapshotTableAction(SparkSession spark, TableCatalog sourceCatalog,
Identifier sourceIdent,
TableCatalog targetCatalog, Identifier
targetIdent) {
Preconditions.checkArgument(isIcebergCatalog(targetCatalog), "Target
catalog is not Iceberg catalog");
this.spark = spark;
this.sourceCatalog = sourceCatalog;
this.sourceIdent = sourceIdent;
this.sourceTable = loadTable(sourceCatalog, sourceIdent);
this.targetCatalog = (StagingTableCatalog) targetCatalog;
this.targetIdent = targetIdent;
}
@Override
public Long execute() {
checkSourceTable();
StagedTable targetTable = stageTargetTable();
org.apache.iceberg.Table targetIcebergTable =
asIcebergTable(targetTable);
try {
assignDefaultNameMapping(targetIcebergTable);
// TODO: cast to v1 table until we know how to build SparkPartitions
ourselves
TableIdentifier v1Identifier =
Spark3Util.toTableIdentifier(sourceIdent);
String stagingLocation = stagingLocation(targetIcebergTable);
SparkTableUtil.importSparkTable(spark, v1Identifier,
targetIcebergTable, stagingLocation);
Snapshot snapshot = targetIcebergTable.currentSnapshot();
String totalDataFilesAsString =
snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
return Long.parseLong(totalDataFilesAsString);
} catch (Exception e) {
targetCatalog.dropTable(targetIdent);
throw new RuntimeException("Error msg", e);
}
}
private boolean isIcebergCatalog(TableCatalog catalog) {
return catalog instanceof SparkSessionCatalog || catalog instanceof
SparkCatalog;
}
private Table loadTable(TableCatalog catalog, Identifier ident) {
try {
return catalog.loadTable(ident);
} catch (NoSuchTableException e) {
throw new IllegalArgumentException("Cannot load table", e);
}
}
private void checkSourceTable() {
// TODO: check provider and other configs (can cast to V1Table if not
enough info in properties)
}
private StagedTable stageTargetTable() {
try {
Map<String, String> props = targetTableProps();
StructType schema = sourceTable.schema();
Transform[] partitioning = sourceTable.partitioning();
return targetCatalog.stageCreate(targetIdent, schema, partitioning,
props);
} catch (NoSuchNamespaceException e) {
throw new IllegalArgumentException("Cannot create a new table in a
namespace which does not exist", e);
} catch (TableAlreadyExistsException e) {
throw new IllegalArgumentException("Destination table already exists",
e);
}
}
private Map<String, String> targetTableProps() {
// TODO: prepare new properties based on the source table, override
location
return null;
}
private String stagingLocation(org.apache.iceberg.Table table) {
// TODO: return the metadata location
return null;
}
private void assignDefaultNameMapping(org.apache.iceberg.Table table) {
// TODO: assign default name mapping
}
private org.apache.iceberg.Table asIcebergTable(Table table) {
return ((SparkTable) table).table();
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]