This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new fc1365ac3aa [HUDI-9377] feat(datahub-sync): adds DataPlatformInstance
aspect (#13268)
fc1365ac3aa is described below
commit fc1365ac3aa64c334e9650ba630c8ce2f45e40f0
Author: Sergio Gómez Villamor <[email protected]>
AuthorDate: Wed May 7 02:30:07 2025 +0200
[HUDI-9377] feat(datahub-sync): adds DataPlatformInstance aspect (#13268)
---
.../hudi/sync/datahub/DataHubSyncClient.java | 67 +++++++++++++----
.../sync/datahub/config/DataHubSyncConfig.java | 12 +++
.../config/HoodieDataHubDatasetIdentifier.java | 62 ++++++++++++++--
.../hudi/sync/datahub/TestDataHubSyncClient.java | 85 +++++++++++++++++++++-
.../config/TestHoodieDataHubDatasetIdentifier.java | 53 ++++++++++++++
5 files changed, 253 insertions(+), 26 deletions(-)
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 594ebb37688..4c0cd13da77 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -32,6 +32,8 @@ import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
import com.linkedin.common.BrowsePathEntry;
import com.linkedin.common.BrowsePathEntryArray;
import com.linkedin.common.BrowsePathsV2;
+import com.linkedin.common.DataPlatformInstance;
+import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.Status;
import com.linkedin.common.SubTypes;
import com.linkedin.common.UrnArray;
@@ -71,6 +73,9 @@ public class DataHubSyncClient extends HoodieSyncClient {
private static final Logger LOG =
LoggerFactory.getLogger(DataHubSyncClient.class);
protected final DataHubSyncConfig config;
+ private final DataPlatformUrn dataPlatformUrn;
+ private final Option<String> dataPlatformInstance;
+ private final Option<Urn> dataPlatformInstanceUrn;
private final DatasetUrn datasetUrn;
private final Urn databaseUrn;
private final String tableName;
@@ -81,7 +86,10 @@ public class DataHubSyncClient extends HoodieSyncClient {
super(config);
this.config = config;
HoodieDataHubDatasetIdentifier datasetIdentifier =
- config.getDatasetIdentifier();
+ config.getDatasetIdentifier();
+ this.dataPlatformUrn = datasetIdentifier.getDataPlatformUrn();
+ this.dataPlatformInstance = datasetIdentifier.getDataPlatformInstance();
+ this.dataPlatformInstanceUrn =
datasetIdentifier.getDataPlatformInstanceUrn();
this.datasetUrn = datasetIdentifier.getDatasetUrn();
this.databaseUrn = datasetIdentifier.getDatabaseUrn();
this.tableName = datasetIdentifier.getTableName();
@@ -217,8 +225,8 @@ public class DataHubSyncClient extends HoodieSyncClient {
return containerProposal;
}
- private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn,
List<BrowsePathEntry> path) {
- BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path);
+ private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn,
List<BrowsePathEntry> paths) {
+ BrowsePathEntryArray browsePathEntryArray = new
BrowsePathEntryArray(paths);
MetadataChangeProposalWrapper browsePathsProposal =
MetadataChangeProposalWrapper.builder()
.entityType(entityUrn.getEntityType())
.entityUrn(entityUrn)
@@ -228,6 +236,21 @@ public class DataHubSyncClient extends HoodieSyncClient {
return browsePathsProposal;
}
+ private MetadataChangeProposalWrapper createDataPlatformInstanceAspect(Urn
entityUrn) {
+ DataPlatformInstance dataPlatformInstanceAspect = new
DataPlatformInstance().setPlatform(this.dataPlatformUrn);
+ if (this.dataPlatformInstanceUrn.isPresent()) {
+ dataPlatformInstanceAspect.setInstance(dataPlatformInstanceUrn.get());
+ }
+
+ MetadataChangeProposalWrapper dataPlatformInstanceProposal =
MetadataChangeProposalWrapper.builder()
+ .entityType(entityUrn.getEntityType())
+ .entityUrn(entityUrn)
+ .upsert()
+ .aspect(dataPlatformInstanceAspect)
+ .build();
+ return dataPlatformInstanceProposal;
+ }
+
private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) {
try {
Urn domainUrn = Urn.createFromString(config.getDomainIdentifier());
@@ -252,12 +275,18 @@ public class DataHubSyncClient extends HoodieSyncClient {
.aspect(new ContainerProperties().setName(databaseName))
.build();
+ List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn ->
Collections.singletonList(
+ new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()))
+ ).orElse(Collections.emptyList());
+
Stream<MetadataChangeProposalWrapper> resultStream = Stream.of(
- containerEntityProposal,
- createSubTypeAspect(databaseUrn, "Database"),
- createBrowsePathsAspect(databaseUrn, Collections.emptyList()),
createStatusAspect(databaseUrn),
- config.attachDomain() ? createDomainAspect(databaseUrn) : null
- ).filter(Objects::nonNull);
+ containerEntityProposal,
+ createSubTypeAspect(databaseUrn, "Database"),
+ createDataPlatformInstanceAspect(databaseUrn),
+ createBrowsePathsAspect(databaseUrn, paths),
+ createStatusAspect(databaseUrn),
+ config.attachDomain() ? createDomainAspect(databaseUrn) : null
+ ).filter(Objects::nonNull);
return resultStream;
}
@@ -314,13 +343,23 @@ public class DataHubSyncClient extends HoodieSyncClient {
}
private Stream<MetadataChangeProposalWrapper> createDatasetEntity() {
+ BrowsePathEntry databasePath = new
BrowsePathEntry().setUrn(databaseUrn).setId(databaseName);
+ List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn -> {
+ List<BrowsePathEntry> list = new ArrayList<BrowsePathEntry>();
+ list.add(new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()));
+ list.add(databasePath);
+ return list;
+ }
+ ).orElse(Collections.singletonList(databasePath));
+
Stream<MetadataChangeProposalWrapper> result = Stream.of(
- createStatusAspect(datasetUrn),
- createSubTypeAspect(datasetUrn, "Table"),
- createBrowsePathsAspect(datasetUrn, Collections.singletonList(new
BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))),
- createContainerAspect(datasetUrn, databaseUrn),
- createSchemaMetadataAspect(tableName),
- config.attachDomain() ? createDomainAspect(datasetUrn) : null
+ createStatusAspect(datasetUrn),
+ createSubTypeAspect(datasetUrn, "Table"),
+ createDataPlatformInstanceAspect(datasetUrn),
+ createBrowsePathsAspect(datasetUrn, paths),
+ createContainerAspect(datasetUrn, databaseUrn),
+ createSchemaMetadataAspect(tableName),
+ config.attachDomain() ? createDomainAspect(datasetUrn) : null
).filter(Objects::nonNull);
return result;
}
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index a54c7c85e48..2731563e04b 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -78,6 +78,13 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
.withDocumentation("String used to represent Hudi when creating its
corresponding DataPlatform entity "
+ "within Datahub");
+ public static final ConfigProperty<String>
META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME = ConfigProperty
+ .key("hoodie.meta.sync.datahub.dataplatform_instance.name")
+ .noDefaultValue()
+ .markAdvanced()
+ .withDocumentation("String used to represent Hudi instance when emitting
Container and Dataset entities "
+ + "with the corresponding DataPlatformInstance, only if given.");
+
public static final ConfigProperty<String> META_SYNC_DATAHUB_DATASET_ENV =
ConfigProperty
.key("hoodie.meta.sync.datahub.dataset.env")
.defaultValue(DEFAULT_DATAHUB_ENV.name())
@@ -174,6 +181,10 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
+ "corresponding DataPlatform entity within Datahub")
public String dataPlatformName;
+ @Parameter(names = {"--data-platform-instance-name"}, description =
"String used to represent Hudi instance when emitting Container and Dataset
entities "
+ + "with the corresponding DataPlatformInstance, only if given.")
+ public String dataPlatformInstanceName;
+
@Parameter(names = {"--dataset-env"}, description = "Which Datahub
Environment to use when pushing entities")
public String datasetEnv;
@@ -196,6 +207,7 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(),
emitterToken);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(),
emitterSupplierClass);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(),
dataPlatformName);
+
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(),
dataPlatformInstanceName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(),
datasetEnv);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(),
domainIdentifier);
// We want the default behavior of DataHubSync Tool when run as command
line to NOT suppress exceptions
diff --git
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index 3c964c1c6b5..055ed8f4b57 100644
---
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -19,6 +19,8 @@
package org.apache.hudi.sync.datahub.config;
+import org.apache.hudi.common.util.Option;
+
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
@@ -29,6 +31,7 @@ import java.util.Properties;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
@@ -43,6 +46,10 @@ public class HoodieDataHubDatasetIdentifier {
public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;
protected final Properties props;
+ private final String dataPlatform;
+ private final DataPlatformUrn dataPlatformUrn;
+ private final Option<String> dataPlatformInstance;
+ private final Option<Urn> dataPlatformInstanceUrn;
private final DatasetUrn datasetUrn;
private final Urn databaseUrn;
private final String tableName;
@@ -55,20 +62,28 @@ public class HoodieDataHubDatasetIdentifier {
}
DataHubSyncConfig config = new DataHubSyncConfig(props);
+ this.dataPlatform =
config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME);
+ this.dataPlatformUrn = createDataPlatformUrn(this.dataPlatform);
+ this.dataPlatformInstance =
Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME));
+ this.dataPlatformInstanceUrn = createDataPlatformInstanceUrn(
+ this.dataPlatformUrn,
+
Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME))
+ );
this.datasetUrn = new DatasetUrn(
-
createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)),
- createDatasetName(config.getString(META_SYNC_DATABASE_NAME),
config.getString(META_SYNC_TABLE_NAME)),
-
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
+ this.dataPlatformUrn,
+ createDatasetName(this.dataPlatformInstance,
config.getString(META_SYNC_DATABASE_NAME),
config.getString(META_SYNC_TABLE_NAME)),
+
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
);
this.tableName = config.getString(META_SYNC_TABLE_NAME);
this.databaseName = config.getString(META_SYNC_DATABASE_NAME);
+ //
https://github.com/datahub-project/datahub/blob/0b105395e913cc47a59bdeed0c56d7c0d4b71b63/metadata-ingestion/src/datahub/emitter/mcp_builder.py#L69-L72
DatabaseKey databaseKey = DatabaseKey.builder()
-
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
- .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
- .database(this.databaseName)
- .build();
+
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
+
.instance(this.dataPlatformInstance.orElse(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)))
+ .database(this.databaseName)
+ .build();
this.databaseUrn = databaseKey.asUrn();
}
@@ -77,6 +92,22 @@ public class HoodieDataHubDatasetIdentifier {
return this.datasetUrn;
}
+ public String getDataPlatform() {
+ return this.dataPlatform;
+ }
+
+ public DataPlatformUrn getDataPlatformUrn() {
+ return this.dataPlatformUrn;
+ }
+
+ public Option<String> getDataPlatformInstance() {
+ return this.dataPlatformInstance;
+ }
+
+ public Option<Urn> getDataPlatformInstanceUrn() {
+ return this.dataPlatformInstanceUrn;
+ }
+
public Urn getDatabaseUrn() {
return this.databaseUrn;
}
@@ -93,7 +124,22 @@ public class HoodieDataHubDatasetIdentifier {
return new DataPlatformUrn(platformUrn);
}
- private static String createDatasetName(String databaseName, String
tableName) {
+ private static Option<Urn> createDataPlatformInstanceUrn(DataPlatformUrn
dataPlatformUrn, Option<String> dataPlatformInstance) {
+ if (dataPlatformInstance.isEmpty()) {
+ return Option.empty();
+ }
+ String dataPlatformInstanceStr =
String.format("urn:li:dataPlatformInstance:(%s,%s)",
dataPlatformUrn.toString(), dataPlatformInstance.get());
+ try {
+ return Option.of(Urn.createFromString(dataPlatformInstanceStr));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Failed to create
DataPlatformInstance URN from string: %s", dataPlatformInstanceStr), e);
+ }
+ }
+
+ private static String createDatasetName(Option<String> dataPlatformInstance,
String databaseName, String tableName) {
+ if (dataPlatformInstance.isPresent()) {
+ return String.format("%s.%s.%s", dataPlatformInstance.get(),
databaseName, tableName);
+ }
return String.format("%s.%s", databaseName, tableName);
}
}
\ No newline at end of file
diff --git
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index d9910bacab5..30d7eb8e405 100644
---
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
@@ -47,10 +48,13 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -97,7 +101,7 @@ public class TestDataHubSyncClient {
}
@Test
- public void testUpdateTableSchemaInvokesRestEmitter() throws IOException {
+ public void testUpdateTableSchema() throws IOException {
Properties props = new Properties();
props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
DummyPartitionValueExtractor.class.getName());
props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
@@ -111,9 +115,82 @@ public class TestDataHubSyncClient {
DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props,
restEmitterMock);
DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
- dhClient.updateTableSchema("some_table", null/*, null*/);
- verify(restEmitterMock,
times(9)).emit(any(MetadataChangeProposalWrapper.class),
- Mockito.any());
+ dhClient.updateTableSchema("some_table", null);
+ ArgumentCaptor<MetadataChangeProposalWrapper> captor =
ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class);
+ verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any());
+
+ Map<String, String> capturedProposalsMap = captor.getAllValues().stream()
+ .collect(Collectors.toMap(
+ mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(),
+ mcpw -> mcpw.getAspect().toString()
+ ));
+
+ Map<String, String> expectedProposalsMap = new HashMap<>();
+
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+containerProperties",
"{name=default}");
+
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+subTypes",
"{typeNames=[Database]}");
+
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+dataPlatformInstance",
"{platform=urn:li:dataPlatform:hudi}");
+
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+browsePathsV2",
"{path=[]}");
+
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+status",
"{removed=false}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+status",
"{removed=false}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+subTypes",
"{typeNames=[Table]}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+dataPlatformInstance",
"{platform=urn:li:dataPlatform:hudi}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+browsePathsV2",
"{path=[{urn=urn:li:container:ca5c62a21c8486b650b16634aacd3996,
id=default}]}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+container",
"{container=urn:li:container:ca5c62a21c8486b650b16634aacd3996}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+schemaMetadata",
"{platformSchema={com.linkedin.schema.OtherSchema={rawSchema"
+ +
"={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}},
schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]"
+ + ".[type=triprec].[type=long].ts, isPartOfKey=false,
type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}],
version=0, platform=urn:li:dataPlatform:hudi"
+ + ", hash=fcb5c4d60382cb4d1dd4710810b42295}");
+
+ assertEquals(expectedProposalsMap, capturedProposalsMap);
+ }
+
+ @Test
+ public void testUpdateTableSchemaWhenPlatformInstance() throws IOException {
+ Properties props = new Properties();
+ props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(),
DummyPartitionValueExtractor.class.getName());
+ props.put(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(),
"test_instance");
+ props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+ Mockito.when(
+ restEmitterMock.emit(any(MetadataChangeProposalWrapper.class),
Mockito.any())
+ ).thenReturn(
+
CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+ );
+
+ DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props,
restEmitterMock);
+ DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+ dhClient.updateTableSchema("some_table", null);
+ ArgumentCaptor<MetadataChangeProposalWrapper> captor =
ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class);
+ verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any());
+
+ Map<String, String> capturedProposalsMap = captor.getAllValues().stream()
+ .collect(Collectors.toMap(
+ mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(),
+ mcpw -> mcpw.getAspect().toString()
+ ));
+
+ Map<String, String> expectedProposalsMap = new HashMap<>();
+
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+containerProperties",
"{name=default}");
+
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+subTypes",
"{typeNames=[Database]}");
+
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+dataPlatformInstance",
"{platform=urn:li:dataPlatform:hudi, instance="
+ +
"urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}");
+
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+browsePathsV2",
"{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform"
+ + ":hudi,test_instance),
id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}]}");
+
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+status",
"{removed=false}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+status",
"{removed=false}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+subTypes",
"{typeNames=[Table]}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+dataPlatformInstance",
"{platform=urn:li:dataPlatform:hudi, instance="
+ +
"urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+browsePathsV2",
"{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform"
+ + ":hudi,test_instance),
id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)},
{urn=urn:li:container:da9c1430eb7551811f4c0e11b911614d, id=default}]}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+container",
"{container=urn:li:container:da9c1430eb7551811f4c0e11b911614d}");
+
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+schemaMetadata",
"{platformSchema={com.linkedin.schema.OtherSchema={rawSchema"
+ +
"={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}},
schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]"
+ + ".[type=triprec].[type=long].ts, isPartOfKey=false,
type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}],
version=0, platform=urn:li:dataPlatform:hudi"
+ + ", hash=fcb5c4d60382cb4d1dd4710810b42295}");
+
+ assertEquals(expectedProposalsMap, capturedProposalsMap);
}
@Test
diff --git
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
index 52af11f0a85..9310944bb88 100644
---
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
+++
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
@@ -30,6 +30,7 @@ import java.util.Properties;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
import static
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -144,4 +145,56 @@ class TestHoodieDataHubDatasetIdentifier {
new HoodieDataHubDatasetIdentifier(props);
});
}
+
+ @Test
+ @DisplayName("Test constructor with platform instance")
+ void testConstructorWithPlatformInstance() {
+ String expectedDatabaseUrnWithPlatformInstance =
"urn:li:container:ee430d6d2a1fb6336b0e972809e41e55";
+ String expectedDatabaseUrnWithEnvAsInstance =
"urn:li:container:ec7465a48d93b5c5e57eca1f44febed5";
+
+ // Given both platform instance and env
+ props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+ props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+ props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(),
"custom_platform");
+ props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(),
"custom_instance");
+ props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+ // When
+ HoodieDataHubDatasetIdentifier identifier = new
HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ assertEquals("custom_platform", identifier.getDataPlatform());
+ assertEquals("urn:li:dataPlatform:custom_platform",
identifier.getDataPlatformUrn().toString());
+ assertEquals("custom_instance",
identifier.getDataPlatformInstance().get());
+
assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)",
identifier.getDataPlatformInstanceUrn().get().toString());
+ assertEquals(expectedDatabaseUrnWithPlatformInstance,
identifier.getDatabaseUrn().toString());
+
+ // Given platform instance only
+ props.remove(META_SYNC_DATAHUB_DATASET_ENV.key());
+
+ // When
+ identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ assertEquals("custom_platform", identifier.getDataPlatform());
+ assertEquals("urn:li:dataPlatform:custom_platform",
identifier.getDataPlatformUrn().toString());
+ assertEquals("custom_instance",
identifier.getDataPlatformInstance().get());
+
assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)",
identifier.getDataPlatformInstanceUrn().get().toString());
+ assertEquals(expectedDatabaseUrnWithPlatformInstance,
identifier.getDatabaseUrn().toString());
+
+ // Given env only
+ props.remove(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key());
+ props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+ // When
+ identifier = new HoodieDataHubDatasetIdentifier(props);
+
+ // Then
+ assertEquals("custom_platform", identifier.getDataPlatform());
+ assertEquals("urn:li:dataPlatform:custom_platform",
identifier.getDataPlatformUrn().toString());
+ assertTrue(identifier.getDataPlatformInstance().isEmpty());
+ assertTrue(identifier.getDataPlatformInstanceUrn().isEmpty());
+ assertEquals(expectedDatabaseUrnWithEnvAsInstance,
identifier.getDatabaseUrn().toString());
+
+ }
}
\ No newline at end of file