This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new fc1365ac3aa [HUDI-9377] feat(datahub-sync): adds DataPlatformInstance 
aspect (#13268)
fc1365ac3aa is described below

commit fc1365ac3aa64c334e9650ba630c8ce2f45e40f0
Author: Sergio Gómez Villamor <[email protected]>
AuthorDate: Wed May 7 02:30:07 2025 +0200

    [HUDI-9377] feat(datahub-sync): adds DataPlatformInstance aspect (#13268)
---
 .../hudi/sync/datahub/DataHubSyncClient.java       | 67 +++++++++++++----
 .../sync/datahub/config/DataHubSyncConfig.java     | 12 +++
 .../config/HoodieDataHubDatasetIdentifier.java     | 62 ++++++++++++++--
 .../hudi/sync/datahub/TestDataHubSyncClient.java   | 85 +++++++++++++++++++++-
 .../config/TestHoodieDataHubDatasetIdentifier.java | 53 ++++++++++++++
 5 files changed, 253 insertions(+), 26 deletions(-)

diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
index 594ebb37688..4c0cd13da77 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/DataHubSyncClient.java
@@ -32,6 +32,8 @@ import org.apache.hudi.sync.datahub.util.SchemaFieldsUtil;
 import com.linkedin.common.BrowsePathEntry;
 import com.linkedin.common.BrowsePathEntryArray;
 import com.linkedin.common.BrowsePathsV2;
+import com.linkedin.common.DataPlatformInstance;
+import com.linkedin.common.urn.DataPlatformUrn;
 import com.linkedin.common.Status;
 import com.linkedin.common.SubTypes;
 import com.linkedin.common.UrnArray;
@@ -71,6 +73,9 @@ public class DataHubSyncClient extends HoodieSyncClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(DataHubSyncClient.class);
 
   protected final DataHubSyncConfig config;
+  private final DataPlatformUrn dataPlatformUrn;
+  private final Option<String> dataPlatformInstance;
+  private final Option<Urn> dataPlatformInstanceUrn;
   private final DatasetUrn datasetUrn;
   private final Urn databaseUrn;
   private final String tableName;
@@ -81,7 +86,10 @@ public class DataHubSyncClient extends HoodieSyncClient {
     super(config);
     this.config = config;
     HoodieDataHubDatasetIdentifier datasetIdentifier =
-        config.getDatasetIdentifier();
+            config.getDatasetIdentifier();
+    this.dataPlatformUrn = datasetIdentifier.getDataPlatformUrn();
+    this.dataPlatformInstance = datasetIdentifier.getDataPlatformInstance();
+    this.dataPlatformInstanceUrn = 
datasetIdentifier.getDataPlatformInstanceUrn();
     this.datasetUrn = datasetIdentifier.getDatasetUrn();
     this.databaseUrn = datasetIdentifier.getDatabaseUrn();
     this.tableName = datasetIdentifier.getTableName();
@@ -217,8 +225,8 @@ public class DataHubSyncClient extends HoodieSyncClient {
     return containerProposal;
   }
 
-  private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, 
List<BrowsePathEntry> path) {
-    BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path);
+  private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, 
List<BrowsePathEntry> paths) {
+    BrowsePathEntryArray browsePathEntryArray = new 
BrowsePathEntryArray(paths);
     MetadataChangeProposalWrapper browsePathsProposal = 
MetadataChangeProposalWrapper.builder()
         .entityType(entityUrn.getEntityType())
         .entityUrn(entityUrn)
@@ -228,6 +236,21 @@ public class DataHubSyncClient extends HoodieSyncClient {
     return browsePathsProposal;
   }
 
+  private MetadataChangeProposalWrapper createDataPlatformInstanceAspect(Urn 
entityUrn) {
+    DataPlatformInstance dataPlatformInstanceAspect = new 
DataPlatformInstance().setPlatform(this.dataPlatformUrn);
+    if (this.dataPlatformInstanceUrn.isPresent()) {
+      dataPlatformInstanceAspect.setInstance(dataPlatformInstanceUrn.get());
+    }
+
+    MetadataChangeProposalWrapper dataPlatformInstanceProposal = 
MetadataChangeProposalWrapper.builder()
+            .entityType(entityUrn.getEntityType())
+            .entityUrn(entityUrn)
+            .upsert()
+            .aspect(dataPlatformInstanceAspect)
+            .build();
+    return dataPlatformInstanceProposal;
+  }
+
   private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) {
     try {
       Urn domainUrn = Urn.createFromString(config.getDomainIdentifier());
@@ -252,12 +275,18 @@ public class DataHubSyncClient extends HoodieSyncClient {
         .aspect(new ContainerProperties().setName(databaseName))
         .build();
 
+    List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn -> 
Collections.singletonList(
+        new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()))
+    ).orElse(Collections.emptyList());
+
     Stream<MetadataChangeProposalWrapper> resultStream = Stream.of(
-        containerEntityProposal,
-        createSubTypeAspect(databaseUrn, "Database"),
-        createBrowsePathsAspect(databaseUrn, Collections.emptyList()), 
createStatusAspect(databaseUrn),
-        config.attachDomain() ? createDomainAspect(databaseUrn) : null
-    ).filter(Objects::nonNull);
+            containerEntityProposal,
+            createSubTypeAspect(databaseUrn, "Database"),
+            createDataPlatformInstanceAspect(databaseUrn),
+            createBrowsePathsAspect(databaseUrn, paths),
+            createStatusAspect(databaseUrn),
+            config.attachDomain() ? createDomainAspect(databaseUrn) : null
+        ).filter(Objects::nonNull);
     return resultStream;
   }
 
@@ -314,13 +343,23 @@ public class DataHubSyncClient extends HoodieSyncClient {
   }
 
   private Stream<MetadataChangeProposalWrapper> createDatasetEntity() {
+    BrowsePathEntry databasePath = new 
BrowsePathEntry().setUrn(databaseUrn).setId(databaseName);
+    List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn -> {
+      List<BrowsePathEntry> list = new ArrayList<BrowsePathEntry>();
+      list.add(new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()));
+      list.add(databasePath);
+      return list;
+    }
+    ).orElse(Collections.singletonList(databasePath));
+
     Stream<MetadataChangeProposalWrapper> result = Stream.of(
-        createStatusAspect(datasetUrn),
-        createSubTypeAspect(datasetUrn, "Table"),
-        createBrowsePathsAspect(datasetUrn, Collections.singletonList(new 
BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))),
-        createContainerAspect(datasetUrn, databaseUrn),
-        createSchemaMetadataAspect(tableName),
-        config.attachDomain() ? createDomainAspect(datasetUrn) : null
+            createStatusAspect(datasetUrn),
+            createSubTypeAspect(datasetUrn, "Table"),
+            createDataPlatformInstanceAspect(datasetUrn),
+            createBrowsePathsAspect(datasetUrn, paths),
+            createContainerAspect(datasetUrn, databaseUrn),
+            createSchemaMetadataAspect(tableName),
+            config.attachDomain() ? createDomainAspect(datasetUrn) : null
     ).filter(Objects::nonNull);
     return result;
   }
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
index a54c7c85e48..2731563e04b 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/DataHubSyncConfig.java
@@ -78,6 +78,13 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       .withDocumentation("String used to represent Hudi when creating its 
corresponding DataPlatform entity "
           + "within Datahub");
 
+  public static final ConfigProperty<String> 
META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME = ConfigProperty
+      .key("hoodie.meta.sync.datahub.dataplatform_instance.name")
+      .noDefaultValue()
+      .markAdvanced()
+      .withDocumentation("String used to represent Hudi instance when emitting 
Container and Dataset entities "
+          + "with the corresponding DataPlatformInstance, only if given.");
+
   public static final ConfigProperty<String> META_SYNC_DATAHUB_DATASET_ENV = 
ConfigProperty
       .key("hoodie.meta.sync.datahub.dataset.env")
       .defaultValue(DEFAULT_DATAHUB_ENV.name())
@@ -174,6 +181,10 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
         + "corresponding DataPlatform entity within Datahub")
     public String dataPlatformName;
 
+    @Parameter(names = {"--data-platform-instance-name"}, description = 
"String used to represent Hudi instance when emitting Container and Dataset 
entities "
+        + "with the corresponding DataPlatformInstance, only if given.")
+    public String dataPlatformInstanceName;
+
     @Parameter(names = {"--dataset-env"}, description = "Which Datahub 
Environment to use when pushing entities")
     public String datasetEnv;
 
@@ -196,6 +207,7 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), 
emitterToken);
       
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), 
emitterSupplierClass);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), 
dataPlatformName);
+      
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), 
dataPlatformInstanceName);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), 
datasetEnv);
       props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), 
domainIdentifier);
       // We want the default behavior of DataHubSync Tool when run as command 
line to NOT suppress exceptions
diff --git 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
index 3c964c1c6b5..055ed8f4b57 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/main/java/org/apache/hudi/sync/datahub/config/HoodieDataHubDatasetIdentifier.java
@@ -19,6 +19,8 @@
 
 package org.apache.hudi.sync.datahub.config;
 
+import org.apache.hudi.common.util.Option;
+
 import com.linkedin.common.FabricType;
 import com.linkedin.common.urn.DataPlatformUrn;
 import com.linkedin.common.urn.DatasetUrn;
@@ -29,6 +31,7 @@ import java.util.Properties;
 
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
 
@@ -43,6 +46,10 @@ public class HoodieDataHubDatasetIdentifier {
   public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;
 
   protected final Properties props;
+  private final String dataPlatform;
+  private final DataPlatformUrn dataPlatformUrn;
+  private final Option<String> dataPlatformInstance;
+  private final Option<Urn> dataPlatformInstanceUrn;
   private final DatasetUrn datasetUrn;
   private final Urn databaseUrn;
   private final String tableName;
@@ -55,20 +62,28 @@ public class HoodieDataHubDatasetIdentifier {
     }
     DataHubSyncConfig config = new DataHubSyncConfig(props);
 
+    this.dataPlatform = 
config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME);
+    this.dataPlatformUrn = createDataPlatformUrn(this.dataPlatform);
+    this.dataPlatformInstance = 
Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME));
+    this.dataPlatformInstanceUrn = createDataPlatformInstanceUrn(
+        this.dataPlatformUrn,
+        
Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME))
+    );
     this.datasetUrn = new DatasetUrn(
-        
createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)),
-        createDatasetName(config.getString(META_SYNC_DATABASE_NAME), 
config.getString(META_SYNC_TABLE_NAME)),
-        
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
+            this.dataPlatformUrn,
+            createDatasetName(this.dataPlatformInstance, 
config.getString(META_SYNC_DATABASE_NAME), 
config.getString(META_SYNC_TABLE_NAME)),
+            
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
     );
 
     this.tableName = config.getString(META_SYNC_TABLE_NAME);
     this.databaseName = config.getString(META_SYNC_DATABASE_NAME);
 
+    // 
https://github.com/datahub-project/datahub/blob/0b105395e913cc47a59bdeed0c56d7c0d4b71b63/metadata-ingestion/src/datahub/emitter/mcp_builder.py#L69-L72
     DatabaseKey databaseKey = DatabaseKey.builder()
-        
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
-        .instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
-        .database(this.databaseName)
-        .build();
+            
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
+            
.instance(this.dataPlatformInstance.orElse(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)))
+            .database(this.databaseName)
+            .build();
 
     this.databaseUrn = databaseKey.asUrn();
   }
@@ -77,6 +92,22 @@ public class HoodieDataHubDatasetIdentifier {
     return this.datasetUrn;
   }
 
+  public String getDataPlatform() {
+    return this.dataPlatform;
+  }
+
+  public DataPlatformUrn getDataPlatformUrn() {
+    return this.dataPlatformUrn;
+  }
+
+  public Option<String> getDataPlatformInstance() {
+    return this.dataPlatformInstance;
+  }
+
+  public Option<Urn> getDataPlatformInstanceUrn() {
+    return this.dataPlatformInstanceUrn;
+  }
+
   public Urn getDatabaseUrn() {
     return this.databaseUrn;
   }
@@ -93,7 +124,22 @@ public class HoodieDataHubDatasetIdentifier {
     return new DataPlatformUrn(platformUrn);
   }
 
-  private static String createDatasetName(String databaseName, String 
tableName) {
+  private static Option<Urn> createDataPlatformInstanceUrn(DataPlatformUrn 
dataPlatformUrn, Option<String> dataPlatformInstance) {
+    if (dataPlatformInstance.isEmpty()) {
+      return Option.empty();
+    }
+    String dataPlatformInstanceStr = 
String.format("urn:li:dataPlatformInstance:(%s,%s)", 
dataPlatformUrn.toString(), dataPlatformInstance.get());
+    try {
+      return Option.of(Urn.createFromString(dataPlatformInstanceStr));
+    } catch (Exception e) {
+      throw new IllegalArgumentException(String.format("Failed to create 
DataPlatformInstance URN from string: %s", dataPlatformInstanceStr), e);
+    }
+  }
+
+  private static String createDatasetName(Option<String> dataPlatformInstance, 
String databaseName, String tableName) {
+    if (dataPlatformInstance.isPresent()) {
+      return String.format("%s.%s.%s", dataPlatformInstance.get(), 
databaseName, tableName);
+    }
     return String.format("%s.%s", databaseName, tableName);
   }
 }
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
index d9910bacab5..30d7eb8e405 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/TestDataHubSyncClient.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
@@ -47,10 +48,13 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_SYNC_SUPPRESS_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -97,7 +101,7 @@ public class TestDataHubSyncClient {
   }
 
   @Test
-  public void testUpdateTableSchemaInvokesRestEmitter() throws IOException {
+  public void testUpdateTableSchema() throws IOException {
     Properties props = new Properties();
     props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
     props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
@@ -111,9 +115,82 @@ public class TestDataHubSyncClient {
     DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
     DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
 
-    dhClient.updateTableSchema("some_table", null/*, null*/);
-    verify(restEmitterMock, 
times(9)).emit(any(MetadataChangeProposalWrapper.class),
-        Mockito.any());
+    dhClient.updateTableSchema("some_table", null);
+    ArgumentCaptor<MetadataChangeProposalWrapper> captor = 
ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class);
+    verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any());
+
+    Map<String, String> capturedProposalsMap = captor.getAllValues().stream()
+        .collect(Collectors.toMap(
+            mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(),
+            mcpw -> mcpw.getAspect().toString()
+        ));
+
+    Map<String, String> expectedProposalsMap = new HashMap<>();
+    
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+containerProperties",
 "{name=default}");
+    
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+subTypes",
 "{typeNames=[Database]}");
+    
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+dataPlatformInstance",
 "{platform=urn:li:dataPlatform:hudi}");
+    
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+browsePathsV2",
 "{path=[]}");
+    
expectedProposalsMap.put("urn:li:container:ca5c62a21c8486b650b16634aacd3996+status",
 "{removed=false}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+status",
 "{removed=false}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+subTypes",
 "{typeNames=[Table]}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+dataPlatformInstance",
 "{platform=urn:li:dataPlatform:hudi}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+browsePathsV2",
 "{path=[{urn=urn:li:container:ca5c62a21c8486b650b16634aacd3996, 
id=default}]}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+container",
 "{container=urn:li:container:ca5c62a21c8486b650b16634aacd3996}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,default.unknown,DEV)+schemaMetadata",
 "{platformSchema={com.linkedin.schema.OtherSchema={rawSchema"
+        + 
"={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}},
 schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]"
+        + ".[type=triprec].[type=long].ts, isPartOfKey=false, 
type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}], 
version=0, platform=urn:li:dataPlatform:hudi"
+        + ", hash=fcb5c4d60382cb4d1dd4710810b42295}");
+
+    assertEquals(expectedProposalsMap, capturedProposalsMap);
+  }
+
+  @Test
+  public void testUpdateTableSchemaWhenPlatformInstance() throws IOException {
+    Properties props = new Properties();
+    props.put(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
DummyPartitionValueExtractor.class.getName());
+    props.put(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), 
"test_instance");
+    props.put(META_SYNC_BASE_PATH.key(), tableBasePath);
+
+    Mockito.when(
+        restEmitterMock.emit(any(MetadataChangeProposalWrapper.class), 
Mockito.any())
+    ).thenReturn(
+        
CompletableFuture.completedFuture(MetadataWriteResponse.builder().build())
+    );
+
+    DatahubSyncConfigStub configStub = new DatahubSyncConfigStub(props, 
restEmitterMock);
+    DataHubSyncClientStub dhClient = new DataHubSyncClientStub(configStub);
+
+    dhClient.updateTableSchema("some_table", null);
+    ArgumentCaptor<MetadataChangeProposalWrapper> captor = 
ArgumentCaptor.forClass(MetadataChangeProposalWrapper.class);
+    verify(restEmitterMock, times(11)).emit(captor.capture(), Mockito.any());
+
+    Map<String, String> capturedProposalsMap = captor.getAllValues().stream()
+        .collect(Collectors.toMap(
+            mcpw -> mcpw.getEntityUrn() + "+" + mcpw.getAspectName(),
+            mcpw -> mcpw.getAspect().toString()
+        ));
+
+    Map<String, String> expectedProposalsMap = new HashMap<>();
+    
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+containerProperties",
 "{name=default}");
+    
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+subTypes",
 "{typeNames=[Database]}");
+    
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+dataPlatformInstance",
 "{platform=urn:li:dataPlatform:hudi, instance="
+        + 
"urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}");
+    
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+browsePathsV2",
 "{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform"
+        + ":hudi,test_instance), 
id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}]}");
+    
expectedProposalsMap.put("urn:li:container:da9c1430eb7551811f4c0e11b911614d+status",
 "{removed=false}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+status",
 "{removed=false}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+subTypes",
 "{typeNames=[Table]}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+dataPlatformInstance",
 "{platform=urn:li:dataPlatform:hudi, instance="
+        + 
"urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+browsePathsV2",
 "{path=[{urn=urn:li:dataPlatformInstance:(urn:li:dataPlatform"
+        + ":hudi,test_instance), 
id=urn:li:dataPlatformInstance:(urn:li:dataPlatform:hudi,test_instance)}, 
{urn=urn:li:container:da9c1430eb7551811f4c0e11b911614d, id=default}]}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+container",
 "{container=urn:li:container:da9c1430eb7551811f4c0e11b911614d}");
+    
expectedProposalsMap.put("urn:li:dataset:(urn:li:dataPlatform:hudi,test_instance.default.unknown,DEV)+schemaMetadata",
 "{platformSchema={com.linkedin.schema.OtherSchema={rawSchema"
+        + 
"={\"type\":\"record\",\"name\":\"triprec\",\"fields\":[{\"name\":\"ts\",\"type\":\"long\"}]}}},
 schemaName=triprec, fields=[{nullable=false, fieldPath=[version=2.0]"
+        + ".[type=triprec].[type=long].ts, isPartOfKey=false, 
type={type={com.linkedin.schema.NumberType={}}}, nativeDataType=long}], 
version=0, platform=urn:li:dataPlatform:hudi"
+        + ", hash=fcb5c4d60382cb4d1dd4710810b42295}");
+
+    assertEquals(expectedProposalsMap, capturedProposalsMap);
   }
 
   @Test
diff --git 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
index 52af11f0a85..9310944bb88 100644
--- 
a/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
+++ 
b/hudi-sync/hudi-datahub-sync/src/test/java/org/apache/hudi/sync/datahub/config/TestHoodieDataHubDatasetIdentifier.java
@@ -30,6 +30,7 @@ import java.util.Properties;
 
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
 import static 
org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -144,4 +145,56 @@ class TestHoodieDataHubDatasetIdentifier {
       new HoodieDataHubDatasetIdentifier(props);
     });
   }
+
+  @Test
+  @DisplayName("Test constructor with platform instance")
+  void testConstructorWithPlatformInstance() {
+    String expectedDatabaseUrnWithPlatformInstance = 
"urn:li:container:ee430d6d2a1fb6336b0e972809e41e55";
+    String expectedDatabaseUrnWithEnvAsInstance = 
"urn:li:container:ec7465a48d93b5c5e57eca1f44febed5";
+
+    // Given both platform instance and env
+    props.setProperty(META_SYNC_DATABASE_NAME.key(), "test_db");
+    props.setProperty(META_SYNC_TABLE_NAME.key(), "test_table");
+    props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), 
"custom_platform");
+    props.setProperty(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), 
"custom_instance");
+    props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+    // When
+    HoodieDataHubDatasetIdentifier identifier = new 
HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    assertEquals("custom_platform", identifier.getDataPlatform());
+    assertEquals("urn:li:dataPlatform:custom_platform", 
identifier.getDataPlatformUrn().toString());
+    assertEquals("custom_instance", 
identifier.getDataPlatformInstance().get());
+    
assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)",
 identifier.getDataPlatformInstanceUrn().get().toString());
+    assertEquals(expectedDatabaseUrnWithPlatformInstance, 
identifier.getDatabaseUrn().toString());
+
+    // Given platform instance only
+    props.remove(META_SYNC_DATAHUB_DATASET_ENV.key());
+
+    // When
+    identifier = new HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    assertEquals("custom_platform", identifier.getDataPlatform());
+    assertEquals("urn:li:dataPlatform:custom_platform", 
identifier.getDataPlatformUrn().toString());
+    assertEquals("custom_instance", 
identifier.getDataPlatformInstance().get());
+    
assertEquals("urn:li:dataPlatformInstance:(urn:li:dataPlatform:custom_platform,custom_instance)",
 identifier.getDataPlatformInstanceUrn().get().toString());
+    assertEquals(expectedDatabaseUrnWithPlatformInstance, 
identifier.getDatabaseUrn().toString());
+
+    // Given env only
+    props.remove(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key());
+    props.setProperty(META_SYNC_DATAHUB_DATASET_ENV.key(), "PROD");
+
+    // When
+    identifier = new HoodieDataHubDatasetIdentifier(props);
+
+    // Then
+    assertEquals("custom_platform", identifier.getDataPlatform());
+    assertEquals("urn:li:dataPlatform:custom_platform", 
identifier.getDataPlatformUrn().toString());
+    assertTrue(identifier.getDataPlatformInstance().isEmpty());
+    assertTrue(identifier.getDataPlatformInstanceUrn().isEmpty());
+    assertEquals(expectedDatabaseUrnWithEnvAsInstance, 
identifier.getDatabaseUrn().toString());
+
+  }
 }
\ No newline at end of file

Reply via email to