This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 97e845915 [GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate
names for source vs. destination-side DB and table (#3835)
97e845915 is described below
commit 97e845915b9374d82791b3782e4279531df429f3
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Nov 22 11:20:36 2023 -0800
[GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate names for
source vs. destination-side DB and table (#3835)
* Allow `IcebergDatasetFinder` to use separate names for source vs.
destination-side DB and table
* Adjust Mockito.verify to pass test
---
.../management/copy/iceberg/IcebergCatalog.java | 11 +++
.../management/copy/iceberg/IcebergDataset.java | 45 +++++------
.../copy/iceberg/IcebergDatasetFinder.java | 87 +++++++++++++++-------
.../copy/iceberg/IcebergRegisterStep.java | 23 ++++--
.../copy/iceberg/IcebergDatasetTest.java | 49 ++++++------
5 files changed, 133 insertions(+), 82 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index ac342e2e3..5794a4c03 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -20,14 +20,25 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
+
IcebergTable openTable(String dbName, String tableName);
+
+ default IcebergTable openTable(TableIdentifier tableId) {
+ // CHALLENGE: clearly better to implement in the reverse direction -
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
+ // but challenging to do at this point, with multiple derived classes
already "in the wild" that implement `openTable(String, String)`
+ return openTable(tableId.namespace().toString(), tableId.name());
+ }
+
String getCatalogUri();
+
void initialize(Map<String, String> properties, Configuration configuration);
+
boolean tableAlreadyExists(IcebergTable icebergTable);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index a59fc3688..67a10a42d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -63,10 +63,8 @@ import
org.apache.gobblin.util.request_allocation.PushDownRequestor;
@Slf4j
@Getter
public class IcebergDataset implements PrioritizedCopyableDataset {
- private final String dbName;
- private final String inputTableName;
private final IcebergTable srcIcebergTable;
- /** Presumed destination {@link IcebergTable} exists */
+ /* CAUTION: *hopefully* `destIcebergTable` exists... although that's not
necessarily been verified yet */
private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
@@ -75,9 +73,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
/** Destination database name */
public static final String DESTINATION_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";
- public IcebergDataset(String db, String table, IcebergTable srcIcebergTable,
IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
- this.dbName = db;
- this.inputTableName = table;
+ public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties, FileSystem sourceFs) {
this.srcIcebergTable = srcIcebergTable;
this.destIcebergTable = destIcebergTable;
this.properties = properties;
@@ -117,9 +113,9 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return createFileSets(targetFs, configuration);
}
- /** @return unique ID for this dataset, usable as a {@link
CopyEntity}.fileset, for atomic publication grouping */
+ /** @return unique ID for dataset (based on the source-side table), usable
as a {@link CopyEntity#getFileSet}, for atomic publication grouping */
protected String getFileSetId() {
- return this.dbName + "." + this.inputTableName;
+ return this.srcIcebergTable.getTableId().toString();
}
/**
@@ -127,7 +123,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
* comprising the iceberg/table, so as to fully specify remaining table
replication.
*/
protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs,
CopyConfiguration configuration) {
- FileSet<CopyEntity> fileSet = new
IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
+ FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getFileSetId(),
this, targetFs, configuration);
return Iterators.singletonIterator(fileSet);
}
@@ -140,7 +136,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus =
getFilePathsToFileStatus(targetFs, copyConfig);
- log.info("~{}.{}~ found {} candidate source paths", dbName,
inputTableName, pathToFileStatus.size());
+ log.info("~{}~ found {} candidate source paths", fileSet,
pathToFileStatus.size());
Configuration defaultHadoopConfiguration = new Configuration();
for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
@@ -165,8 +161,8 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
copyEntities.add(fileEntity);
}
// TODO: Filter properties specific to iceberg registration and avoid
serializing every global property
- copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName,
this.properties));
- log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName,
copyEntities.size());
+ copyEntities.add(createPostPublishStep());
+ log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
return copyEntities;
}
@@ -187,8 +183,8 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
IcebergSnapshotInfo currentSnapshotOverview =
icebergTable.getCurrentSnapshotInfoOverviewOnly();
if
(currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false)
&&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
- log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}'
and metadata '{}' both present on target",
- dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+ log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and
metadata '{}' both present on target",
+ this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"));
return Maps.newHashMap();
@@ -198,7 +194,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
// log each snapshot, for context, in case of
`FileNotFoundException` during `FileSystem.getFileStatus()`
String manListPath = snapshotInfo.getManifestListPath();
- log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path:
'{}'", dbName, inputTableName,
+ log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path:
'{}'", this.getFileSetId(),
snapshotInfo.getSnapshotId(), manListPath,
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
// ALGO: an iceberg's files form a tree of four levels:
metadata.json -> manifest-list -> manifest -> data;
// most critically, all are presumed immutable and uniquely named,
although any may be replaced. we depend
@@ -224,18 +220,17 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
missingPaths.addAll(mfi.getListedFilePaths());
}
}
- log.info("~{}.{}~ snapshot '{}': collected {} additional source
paths",
- dbName, inputTableName, snapshotInfo.getSnapshotId(),
missingPaths.size());
+ log.info("~{}~ snapshot '{}': collected {} additional source
paths",
+ this.getFileSetId(), snapshotInfo.getSnapshotId(),
missingPaths.size());
return missingPaths.iterator();
} else {
- log.info("~{}.{}~ snapshot '{}' already present on target...
skipping (including contents)",
- dbName, inputTableName, snapshotInfo.getSnapshotId());
+ log.info("~{}~ snapshot '{}' already present on target... skipping
(including contents)",
+ this.getFileSetId(), snapshotInfo.getSnapshotId());
// IMPORTANT: separately consider metadata path, to handle case of
'metadata-only' snapshot reusing mf-list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p
-> !isPresentOnTarget.apply(p));
metadataPath.ifPresent(ignore ->
- log.info("~{}.{}~ metadata IS {} already present on target",
dbName, inputTableName,
- nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+ log.info("~{}~ metadata IS {} already present on target",
this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
);
return nonReplicatedMetadataPath.map(p ->
Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
}
@@ -255,7 +250,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
try {
results.put(path, this.sourceFs.getFileStatus(path));
if (growthTracker.isAnotherMilestone(results.size())) {
- log.info("~{}.{}~ collected file status on '{}' source paths",
dbName, inputTableName, results.size());
+ log.info("~{}~ collected file status on '{}' source paths",
this.getFileSetId(), results.size());
}
} catch (FileNotFoundException fnfe) {
if (!shouldTolerateMissingSourceFiles) {
@@ -265,7 +260,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
String total = ++numSourceFilesNotFound + " total";
String speculation = "either premature deletion broke time-travel
or metadata read interleaved among delete";
errorConsolidator.prepLogMsg(path).ifPresent(msg ->
- log.warn("~{}.{}~ source {} ({}... {})", dbName,
inputTableName, msg, speculation, total)
+ log.warn("~{}~ source {} ({}... {})", this.getFileSetId(),
msg, speculation, total)
);
}
}
@@ -326,8 +321,8 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}
- private PostPublishStep createPostPublishStep(String dbName, String
inputTableName, Properties properties) {
- IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName,
inputTableName, properties);
+ private PostPublishStep createPostPublishStep() {
+ IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(this.srcIcebergTable.getTableId(),
this.destIcebergTable.getTableId(), this.properties);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(),
icebergRegisterStep, 0);
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index beded5a72..dc407f38c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -55,22 +55,28 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_CATALOG_KEY = "catalog";
/**
- * This is used with a prefix: "{@link
IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "(source or destination)"
+ "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
- * It is an open-ended pattern used to pass arbitrary catalog specific
properties
+ * This is used with a prefix: "{@link
IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "( source | destination
)" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
+ * It is an open-ended pattern used to pass arbitrary catalog-scoped
properties
*/
public static final String ICEBERG_CATALOG_CLASS_KEY = "class";
- public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX +
".database.name";
- public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX +
".table.name";
+ public static final String ICEBERG_DB_NAME_KEY = "database.name";
+ public static final String ICEBERG_TABLE_NAME_KEY = "table.name";
+ /** please use source/dest-scoped properties */
+ @Deprecated
+ public static final String ICEBERG_DB_NAME_LEGACY = ICEBERG_DATASET_PREFIX +
"." + ICEBERG_DB_NAME_KEY;
+ /** please use source/dest-scoped properties */
+ @Deprecated
+ public static final String ICEBERG_TABLE_NAME_LEGACY =
ICEBERG_DATASET_PREFIX + "." + ICEBERG_TABLE_NAME_KEY;
public enum CatalogLocation {
SOURCE,
DESTINATION;
/**
- * Provides prefix for configs based on the catalog location to filter
catalog specific properties
+ * Provides prefix for configs based on the catalog orientation (source or
destination) for catalog-targeted properties
*/
public String getConfigPrefix() {
- return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() +
"." + ICEBERG_CATALOG_KEY + ".";
+ return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() +
".";
}
}
@@ -86,20 +92,38 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
*/
@Override
public List<IcebergDataset> findDatasets() throws IOException {
- List<IcebergDataset> matchingDatasets = new ArrayList<>();
- if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) ||
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
- throw new IllegalArgumentException(String.format("Iceberg database name:
{%s} or Iceberg table name: {%s} is missing",
- ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
+ String srcDbName = getLocationQualifiedProperty(properties,
CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY);
+ String destDbName = getLocationQualifiedProperty(properties,
CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY);
+ // TODO: eventually remove support for combo (src+dest) iceberg props, in
favor of separate source/dest-scoped props; for now, maintain support
+ if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName)) {
+ srcDbName = destDbName = properties.getProperty(ICEBERG_DB_NAME_LEGACY);
+ }
+ String srcTableName = getLocationQualifiedProperty(properties,
CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY);
+ String destTableName = getLocationQualifiedProperty(properties,
CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY);
+ // TODO: eventually remove support for combo (src+dest) iceberg props, in
favor of separate source/dest-scoped props; for now, maintain support
+ if (StringUtils.isBlank(srcTableName) ||
StringUtils.isBlank(destTableName)) {
+ srcTableName = destTableName =
properties.getProperty(ICEBERG_TABLE_NAME_LEGACY);
+ }
+ if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(srcTableName)) {
+ throw new IllegalArgumentException(
+ String.format("Missing (at least some) IcebergDataset properties -
source: ('%s' and '%s') and destination: ('%s' and '%s') "
+ + "or [deprecated!] common/combo: ('%s' and '%s')",
+ calcLocationQualifiedPropName(CatalogLocation.SOURCE,
ICEBERG_DB_NAME_KEY),
+ calcLocationQualifiedPropName(CatalogLocation.SOURCE,
ICEBERG_TABLE_NAME_KEY),
+ calcLocationQualifiedPropName(CatalogLocation.DESTINATION,
ICEBERG_DB_NAME_KEY),
+ calcLocationQualifiedPropName(CatalogLocation.DESTINATION,
ICEBERG_TABLE_NAME_KEY),
+ ICEBERG_DB_NAME_LEGACY,
+ ICEBERG_TABLE_NAME_LEGACY));
}
- String dbName = properties.getProperty(ICEBERG_DB_NAME);
- String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
- IcebergCatalog sourceIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
- IcebergCatalog destinationIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+ IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties,
CatalogLocation.SOURCE);
+ IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties,
CatalogLocation.DESTINATION);
+
/* Each Iceberg dataset maps to an Iceberg table */
- matchingDatasets.add(createIcebergDataset(dbName, tblName,
sourceIcebergCatalog, destinationIcebergCatalog, this.properties,
this.sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(),
- matchingDatasets, dbName, tblName); // until future support added to
specify multiple icebergs, count expected always to be one
+ List<IcebergDataset> matchingDatasets = new ArrayList<>();
+ matchingDatasets.add(createIcebergDataset(srcIcebergCatalog, srcDbName,
srcTableName, destIcebergCatalog, destDbName, destTableName, this.properties,
this.sourceFs));
+ log.info("Found {} matching datasets: {} for the (source) '{}.{}' / (dest)
'{}.{}'", matchingDatasets.size(),
+ matchingDatasets, srcDbName, srcTableName, destDbName, destTableName);
// until future support added to specify multiple icebergs, count expected
always to be one
return matchingDatasets;
}
@@ -114,28 +138,39 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
}
/**
- * Requires both source and destination catalogs to connect to their
respective {@link IcebergTable}
+ * Uses each source and destination {@link IcebergCatalog} to load and
verify existence of the respective {@link IcebergTable}
* Note: the destination side {@link IcebergTable} should be present before
initiating replication
+ *
* @return {@link IcebergDataset} with its corresponding source and
destination {@link IcebergTable}
*/
- protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog,
Properties properties, FileSystem fs) throws IOException {
- IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName,
tblName);
-
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
- IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(dbName, tblName);
+ protected IcebergDataset createIcebergDataset(IcebergCatalog
sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog
destinationIcebergCatalog, String destDbName, String destTableName, Properties
properties, FileSystem fs) throws IOException {
+ IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName,
srcTableName);
+
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName,
srcTableName));
+ IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
-
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName,
tblName));
- return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
+
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName,
destTableName));
+ return new IcebergDataset(srcIcebergTable, destIcebergTable, properties,
fs);
}
protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
- String prefix = location.getConfigPrefix();
- Map<String, String> catalogProperties =
buildMapFromPrefixChildren(properties, prefix);
+ String catalogPrefix = calcLocationQualifiedPropName(location,
ICEBERG_CATALOG_KEY + ".");
+ Map<String, String> catalogProperties =
buildMapFromPrefixChildren(properties, catalogPrefix);
// TODO: Filter properties specific to Hadoop
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
String icebergCatalogClassName =
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
}
+ /** @return property value or `null` */
+ protected static String getLocationQualifiedProperty(Properties properties,
CatalogLocation location, String relativePropName) {
+ return properties.getProperty(calcLocationQualifiedPropName(location,
relativePropName));
+ }
+
+ /** @return absolute (`location`-qualified) property name for
`relativePropName` */
+ protected static String calcLocationQualifiedPropName(CatalogLocation
location, String relativePropName) {
+ return location.getConfigPrefix() + relativePropName;
+ }
+
/**
* Filters the properties based on a prefix using {@link
ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map}
*/
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 8f32f8cc0..8beb72e66 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -21,23 +21,30 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
-import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
+
/**
* {@link CommitStep} to perform Iceberg registration.
*/
@Slf4j
-@AllArgsConstructor
public class IcebergRegisterStep implements CommitStep {
- private final String dbName;
- private final String tblName;
+ // store as string for serializability... TODO: explore whether truly
necessary (or we could just as well store as `TableIdentifier`)
+ private final String srcTableIdStr;
+ private final String destTableIdStr;
private final Properties properties;
+ public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier
destTableId, Properties properties) {
+ this.srcTableIdStr = srcTableId.toString();
+ this.destTableIdStr = destTableId.toString();
+ this.properties = properties;
+ }
+
@Override
public boolean isCompleted() throws IOException {
return false;
@@ -46,12 +53,12 @@ public class IcebergRegisterStep implements CommitStep {
@Override
public void execute() throws IOException {
IcebergTable srcIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.SOURCE)
- .openTable(this.dbName, this.tblName);
+ .openTable(TableIdentifier.parse(srcTableIdStr));
IcebergTable destIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION)
- .openTable(this.dbName, this.tblName);
+ .openTable(TableIdentifier.parse(destTableIdStr));
TableMetadata destinationMetadata = null;
try {
- destinationMetadata = destIcebergTable.accessTableMetadata();
+ destinationMetadata = destIcebergTable.accessTableMetadata(); //
probe... (first access could throw)
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
}
@@ -59,6 +66,6 @@ public class IcebergRegisterStep implements CommitStep {
}
@Override
public String toString() {
- return String.format("Registering Iceberg Table: {%s}.{%s} ", this.dbName,
this.tblName);
+ return String.format("Registering Iceberg Table: {%s} (dest); (src:
{%s})", this.destTableIdStr, this.srcTableIdStr);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 09238445c..0b485e1df 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -193,17 +193,18 @@ public class IcebergDatasetTest {
validateGetFilePathsGivenDestState(icebergSnapshots,
existingDestPaths, expectedResultPaths);
// ensure short-circuiting was able to avert iceberg manifests scan
Mockito.verify(mockTable,
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
+ Mockito.verify(mockTable, Mockito.times(1)).getTableId();
Mockito.verifyNoMoreInteractions(mockTable);
}
/** Exception wrapping is used internally--ensure that doesn't lapse into
silently swallowing errors */
@Test(expectedExceptions = IOException.class)
public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws
IOException {
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Lists.newArrayList(SNAPSHOT_PATHS_0));
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName),
Lists.newArrayList(SNAPSHOT_PATHS_0));
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset = new IcebergDataset(testDbName,
testTblName, icebergTable, null, new Properties(), sourceFs);
+ IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destFsBuilder.build();
@@ -241,10 +242,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset =
- new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
+ TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
+ IcebergTable srcIcebergTbl =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTbl =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl,
destIcebergTbl, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -267,10 +268,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
- IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset =
- new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
+ TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -298,9 +299,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
+ TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -326,9 +328,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
+ TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -358,13 +361,12 @@ public class IcebergDatasetTest {
*/
protected IcebergTable
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths>
sourceSnapshotPathSets,
Optional<List<String>> optExistingSourcePaths, List<String>
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName),
sourceSnapshotPathSets);
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset =
- new IcebergDataset(testDbName, testTblName, icebergTable, null, new
Properties(), sourceFs);
+ IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
destFsBuilder.addPaths(existingDestPaths);
@@ -377,7 +379,7 @@ public class IcebergDatasetTest {
Assert.assertEquals(
filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()),
expectedResultPaths);
- return icebergTable;
+ return srcIcebergTable;
}
/** @return `paths` after adding to it all paths of every one of
`snapshotDefs` */
@@ -464,9 +466,9 @@ public class IcebergDatasetTest {
* Without this, so to lose the mock, we'd be unable to set up any source
paths as existing.
*/
protected static class TrickIcebergDataset extends IcebergDataset {
- public TrickIcebergDataset(String db, String table, IcebergTable
srcIcebergTbl, IcebergTable destIcebergTbl, Properties properties,
+ public TrickIcebergDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties,
FileSystem sourceFs) {
- super(db, table, srcIcebergTbl, destIcebergTbl, properties, sourceFs);
+ super(srcIcebergTable, destIcebergTable, properties, sourceFs);
}
@Override // as the `static` is not mock-able
@@ -581,8 +583,9 @@ public class IcebergDatasetTest {
}
}
- public static IcebergTable withSnapshots(List<SnapshotPaths>
snapshotPathSets) throws IOException {
+ public static IcebergTable withSnapshots(TableIdentifier tableId,
List<SnapshotPaths> snapshotPathSets) throws IOException {
IcebergTable table = Mockito.mock(IcebergTable.class);
+ Mockito.when(table.getTableId()).thenReturn(tableId);
int lastIndex = snapshotPathSets.size() - 1;
Mockito.when(table.getCurrentSnapshotInfoOverviewOnly())
.thenReturn(snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));