This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9a6508eca Use DB-qualified table ID as `IcebergTable` dataset
descriptor (#3834)
9a6508eca is described below
commit 9a6508ecadd45f46521cd9144d1b12fef461d1f1
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Nov 21 08:23:07 2023 -0800
Use DB-qualified table ID as `IcebergTable` dataset descriptor (#3834)
---
.../data/management/copy/iceberg/IcebergTable.java | 6 ++++--
.../data/management/copy/iceberg/IcebergDatasetTest.java | 15 +++++++++++++++
.../metrics/event/lineage/LineageEventBuilder.java | 4 ++--
.../apache/gobblin/metrics/event/lineage/LineageInfo.java | 8 ++++----
.../gobblin/metrics/event/lineage/LineageEventTest.java | 2 +-
5 files changed, 26 insertions(+), 9 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 4fe1840fd..a07f6ac89 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -67,6 +67,7 @@ public class IcebergTable {
this.tableId = tableId;
}
}
+
@Getter
private final TableIdentifier tableId;
private final TableOperations tableOps;
@@ -182,11 +183,12 @@ public class IcebergTable {
return Lists.newArrayList(manifestPathsIterable);
}
}
- protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
+
+ public DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
DatasetDescriptor descriptor = new DatasetDescriptor(
DatasetConstants.PLATFORM_ICEBERG,
URI.create(this.catalogUri),
- this.tableId.name()
+ this.tableId.toString() // use FQ ID, including table namespace
);
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index c1872cb4a..09238445c 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.TableOperations;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -61,6 +63,8 @@ import
org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyContext;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import static org.mockito.Mockito.any;
@@ -109,6 +113,17 @@ public class IcebergDatasetTest {
copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
}
+ @Test
+ public void testGetDatasetDescriptor() throws URISyntaxException {
+ TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
+ IcebergTable table = new IcebergTable(tableId,
Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+ FileSystem mockFs = Mockito.mock(FileSystem.class);
+ Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
+ DatasetDescriptor expected = new
DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG,
URI.create(SRC_CATALOG_URI), tableId.toString());
+ expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString());
+ Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected);
+ }
+
@Test
public void testGetFilePathsWhenDestEmpty() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
index 210c2b9db..12c993029 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageEventBuilder.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.metrics.event.GobblinEventBuilder;
@Slf4j
public final class LineageEventBuilder extends GobblinEventBuilder {
- static final String LIENAGE_EVENT_NAMESPACE = getKey(NAMESPACE, "lineage");
+ static final String LINEAGE_EVENT_NAMESPACE = getKey(NAMESPACE, "lineage");
static final String SOURCE = "source";
static final String DESTINATION = "destination";
static final String LINEAGE_EVENT_TYPE = "LineageEvent";
@@ -56,7 +56,7 @@ public final class LineageEventBuilder extends
GobblinEventBuilder {
private Descriptor destination;
public LineageEventBuilder(String name) {
- super(name, LIENAGE_EVENT_NAMESPACE);
+ super(name, LINEAGE_EVENT_NAMESPACE);
addMetadata(EVENT_TYPE, LINEAGE_EVENT_TYPE);
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index b167bac12..469ab9da4 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -239,7 +239,7 @@ public final class LineageInfo {
* Remove all lineage related properties from a state
*/
public static void purgeLineageInfo(State state) {
- state.removePropsWithPrefix(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+ state.removePropsWithPrefix(LineageEventBuilder.LINEAGE_EVENT_NAMESPACE);
}
/**
@@ -253,7 +253,7 @@ public final class LineageInfo {
* Get the full lineage event name from a state
*/
public static String getFullEventName(State state) {
- return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE,
state.getProp(getKey(NAME_KEY)));
+ return Joiner.on('.').join(LineageEventBuilder.LINEAGE_EVENT_NAMESPACE,
state.getProp(getKey(NAME_KEY)));
}
@@ -298,11 +298,11 @@ public final class LineageInfo {
}
/**
- * Prefix all keys with {@link LineageEventBuilder#LIENAGE_EVENT_NAMESPACE}
+ * Prefix all keys with {@link LineageEventBuilder#LINEAGE_EVENT_NAMESPACE}
*/
private static String getKey(Object... objects) {
Object[] args = new Object[objects.length + 1];
- args[0] = LineageEventBuilder.LIENAGE_EVENT_NAMESPACE;
+ args[0] = LineageEventBuilder.LINEAGE_EVENT_NAMESPACE;
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index e792a7f4e..e676bc80a 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -196,7 +196,7 @@ public class LineageEventTest {
private void verify(LineageEventBuilder event, String name, Descriptor
source, Descriptor destination) {
Assert.assertEquals(event.getName(), name);
- Assert.assertEquals(event.getNamespace(),
LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+ Assert.assertEquals(event.getNamespace(),
LineageEventBuilder.LINEAGE_EVENT_NAMESPACE);
Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE),
LineageEventBuilder.LINEAGE_EVENT_TYPE);
Assert.assertTrue(event.getSource().equals(source));
Assert.assertTrue(event.getDestination().equals(destination));