This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ab094a42 [GOBBLIN-1890] Offset ranges allow multiple formats GMIP 
(#3753)
0ab094a42 is described below

commit 0ab094a4277b92e17c46b3722b603daaf126d022
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Aug 29 15:55:24 2023 -0700

    [GOBBLIN-1890] Offset ranges allow multiple formats GMIP (#3753)
    
    * [GOBBLIN-1890] Offset ranges allow multiple formats GMIP
    
    * Mirror the change in IcebergMetadataWriter
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 11 +++++---
 .../iceberg/writer/IcebergMetadataWriter.java      |  3 ++-
 .../iceberg/writer/HiveMetadataWriterTest.java     | 29 +++++++++++++++++++---
 3 files changed, 35 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index e5ac2741c..d86c0c947 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -267,18 +267,23 @@ public class HiveMetadataWriter implements MetadataWriter 
{
   }
 
   @Nullable
-  private String getTopicName(GobblinMetadataChangeEvent gmce) {
+  protected String getTopicName(GobblinMetadataChangeEvent gmce) {
     //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
     //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
     String topicName = null;
     if (gmce.getTopicPartitionOffsetsRange() != null && 
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+      // In case the topic name is not the table name or the topic name 
contains '-'
       String topicPartitionString = 
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
-      //In case the topic name is not the table name or the topic name 
contains '-'
-      topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+      topicName = parseTopicNameFromOffsetRangeKey(topicPartitionString);
     }
     return topicName;
   }
 
+  public static String parseTopicNameFromOffsetRangeKey(String offsetRangeKey) 
{
+    int startOfTopicName = offsetRangeKey.lastIndexOf('.') + 1;
+    return offsetRangeKey.substring(startOfTopicName, 
offsetRangeKey.lastIndexOf('-'));
+  }
+
   /**
    * We care about if a table key is in the spec cache because it means that 
we have already created this table before
    * since the last flush. Therefore, we can use this method to check whether 
we need to create a table
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index d614e2524..713378b99 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -107,6 +107,7 @@ import org.apache.gobblin.hive.HiveLock;
 import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.HiveMetadataWriter;
 import org.apache.gobblin.hive.writer.MetadataWriter;
 import org.apache.gobblin.hive.writer.MetadataWriterKeys;
 import org.apache.gobblin.iceberg.Utils.IcebergUtils;
@@ -813,7 +814,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     if (tableMetadata.dataOffsetRange.isPresent() && 
tableMetadata.dataOffsetRange.get().size() != 0) {
       String topicPartitionString = 
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
       //In case the topic name is not the table name or the topic name 
contains '-'
-      return topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+      return 
HiveMetadataWriter.parseTopicNameFromOffsetRangeKey(topicPartitionString);
     }
     return tableMetadata.newProperties.or(
         
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 2797bdf49..d442b942f 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -109,6 +109,7 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
 
   private GobblinMCEWriter gobblinMCEWriter;
 
+  GobblinMetadataChangeEvent.Builder gmceBuilder;
   GobblinMetadataChangeEvent gmce;
   static File tmpDir;
   static File dataDir;
@@ -163,7 +164,7 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
     writeRecord(dailyDataFile);
     Map<String, String> registrationState = new HashMap();
     registrationState.put("hive.database.name", dbName);
-    gmce = GobblinMetadataChangeEvent.newBuilder()
+    gmceBuilder = GobblinMetadataChangeEvent.newBuilder()
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
             .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
@@ -183,13 +184,14 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
         .setPartitionColumns(Lists.newArrayList("testpartition"))
         .setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
         .setRegistrationProperties(registrationState)
-        
.setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName()))
-        .build();
+        
.setAllowedMetadataWriters(Collections.singletonList(TestHiveMetadataWriter.class.getName()));
+    gmce = gmceBuilder.build();
+
     state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
         KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
     state.setProp("default.hive.registration.policy",
         TestHiveRegistrationPolicy.class.getName());
-    state.setProp("gmce.metadata.writer.classes", 
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
+    state.setProp("gmce.metadata.writer.classes", 
TestHiveMetadataWriter.class.getName());
     gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(), 
state);
   }
 
@@ -401,6 +403,21 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertThrows(IllegalStateException.class, () -> 
updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
   }
 
+  @Test
+  public void testGetTopicName() {
+    final String expectedTopicName = "123-topic-Name-123_v2";
+    Function<String, GobblinMetadataChangeEvent> getGmce = (offsetRangeKey) -> 
{
+      Map<String, String> offsetRangeMap = new HashMap<>();
+      offsetRangeMap.put(String.format(offsetRangeKey, expectedTopicName), 
"0-100");
+      return 
GobblinMetadataChangeEvent.newBuilder(gmceBuilder).setTopicPartitionOffsetsRange(offsetRangeMap).build();
+    };
+
+    TestHiveMetadataWriter hiveWriter = (TestHiveMetadataWriter) 
gobblinMCEWriter.getMetadataWriters().get(0);
+    Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("%s-0")), 
expectedTopicName);
+    
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.%s-0")),
 expectedTopicName);
+    
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.foobar.%s-0")),
 expectedTopicName);
+  }
+
   private String writeRecord(File file) throws IOException {
     GenericData.Record record = new GenericData.Record(avroDataSchema);
     record.put("id", 1L);
@@ -427,6 +444,10 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
       super(state);
     }
 
+    public String getTopicName(GobblinMetadataChangeEvent gmce) {
+      return super.getTopicName(gmce);
+    }
+
     public static boolean updateLatestSchemaMapWithExistingSchema(
         String dbName,
         String tableName,

Reply via email to