This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0ab094a42 [GOBBLIN-1890] Offset ranges allow multiple formats GMIP
(#3753)
0ab094a42 is described below
commit 0ab094a4277b92e17c46b3722b603daaf126d022
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Aug 29 15:55:24 2023 -0700
[GOBBLIN-1890] Offset ranges allow multiple formats GMIP (#3753)
* [GOBBLIN-1890] Offset ranges allow multiple formats GMIP
* Mirror the change in IcebergMetadataWriter
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 11 +++++---
.../iceberg/writer/IcebergMetadataWriter.java | 3 ++-
.../iceberg/writer/HiveMetadataWriterTest.java | 29 +++++++++++++++++++---
3 files changed, 35 insertions(+), 8 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index e5ac2741c..d86c0c947 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -267,18 +267,23 @@ public class HiveMetadataWriter implements MetadataWriter
{
}
@Nullable
- private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ protected String getTopicName(GobblinMetadataChangeEvent gmce) {
//Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
//todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
String topicName = null;
if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ // In case the topic name is not the table name or the topic name
contains '-'
String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
- //In case the topic name is not the table name or the topic name
contains '-'
- topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ topicName = parseTopicNameFromOffsetRangeKey(topicPartitionString);
}
return topicName;
}
+ public static String parseTopicNameFromOffsetRangeKey(String offsetRangeKey)
{
+ int startOfTopicName = offsetRangeKey.lastIndexOf('.') + 1;
+ return offsetRangeKey.substring(startOfTopicName,
offsetRangeKey.lastIndexOf('-'));
+ }
+
/**
* We care about if a table key is in the spec cache because it means that
we have already created this table before
* since the last flush. Therefore, we can use this method to check whether
we need to create a table
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index d614e2524..713378b99 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -107,6 +107,7 @@ import org.apache.gobblin.hive.HiveLock;
import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
+import org.apache.gobblin.hive.writer.HiveMetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriter;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
@@ -813,7 +814,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if (tableMetadata.dataOffsetRange.isPresent() &&
tableMetadata.dataOffsetRange.get().size() != 0) {
String topicPartitionString =
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name
contains '-'
- return topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ return
HiveMetadataWriter.parseTopicNameFromOffsetRangeKey(topicPartitionString);
}
return tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 2797bdf49..d442b942f 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -109,6 +109,7 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
private GobblinMCEWriter gobblinMCEWriter;
+ GobblinMetadataChangeEvent.Builder gmceBuilder;
GobblinMetadataChangeEvent gmce;
static File tmpDir;
static File dataDir;
@@ -163,7 +164,7 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
writeRecord(dailyDataFile);
Map<String, String> registrationState = new HashMap();
registrationState.put("hive.database.name", dbName);
- gmce = GobblinMetadataChangeEvent.newBuilder()
+ gmceBuilder = GobblinMetadataChangeEvent.newBuilder()
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
@@ -183,13 +184,14 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
.setPartitionColumns(Lists.newArrayList("testpartition"))
.setRegistrationPolicy(TestHiveRegistrationPolicy.class.getName())
.setRegistrationProperties(registrationState)
-
.setAllowedMetadataWriters(Collections.singletonList(HiveMetadataWriter.class.getName()))
- .build();
+
.setAllowedMetadataWriters(Collections.singletonList(TestHiveMetadataWriter.class.getName()));
+ gmce = gmceBuilder.build();
+
state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
TestHiveRegistrationPolicy.class.getName());
- state.setProp("gmce.metadata.writer.classes",
"org.apache.gobblin.hive.writer.HiveMetadataWriter");
+ state.setProp("gmce.metadata.writer.classes",
TestHiveMetadataWriter.class.getName());
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
}
@@ -401,6 +403,21 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertThrows(IllegalStateException.class, () ->
updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
}
+ @Test
+ public void testGetTopicName() {
+ final String expectedTopicName = "123-topic-Name-123_v2";
+ Function<String, GobblinMetadataChangeEvent> getGmce = (offsetRangeKey) ->
{
+ Map<String, String> offsetRangeMap = new HashMap<>();
+ offsetRangeMap.put(String.format(offsetRangeKey, expectedTopicName),
"0-100");
+ return
GobblinMetadataChangeEvent.newBuilder(gmceBuilder).setTopicPartitionOffsetsRange(offsetRangeMap).build();
+ };
+
+ TestHiveMetadataWriter hiveWriter = (TestHiveMetadataWriter)
gobblinMCEWriter.getMetadataWriters().get(0);
+ Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("%s-0")),
expectedTopicName);
+
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.%s-0")),
expectedTopicName);
+
Assert.assertEquals(hiveWriter.getTopicName(getGmce.apply("kafkaIdentifier.foobar.%s-0")),
expectedTopicName);
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
@@ -427,6 +444,10 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
super(state);
}
+ public String getTopicName(GobblinMetadataChangeEvent gmce) {
+ return super.getTopicName(gmce);
+ }
+
public static boolean updateLatestSchemaMapWithExistingSchema(
String dbName,
String tableName,