This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d90d1961b Add GMCE topic explicitly to hive commit event (#3547)
d90d1961b is described below
commit d90d1961bbb8242a45266ec02eb0f27da5735c14
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Aug 31 13:36:29 2022 -0700
Add GMCE topic explicitly to hive commit event (#3547)
---
.../apache/gobblin/hive/writer/HiveMetadataWriter.java | 15 +++++++++++++--
.../apache/gobblin/hive/writer/MetadataWriterKeys.java | 1 +
.../gobblin/iceberg/writer/IcebergMetadataWriter.java | 3 ++-
3 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 84f805eb9..730210159 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -96,6 +96,9 @@ public class HiveMetadataWriter implements MetadataWriter {
/* Mapping from tableIdentifier to a cache, key'ed by a list of partitions
with value as HiveSpec object. */
private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
+ // Used to store the relationship between table and the gmce topicPartition
+ private final HashMap<String, String> tableTopicPartitionMap;
+
/* Mapping from tableIdentifier to latest schema observed. */
private final HashMap<String, String> latestSchemaMap;
private final long timeOutSeconds;
@@ -116,6 +119,7 @@ public class HiveMetadataWriter implements MetadataWriter {
this.schemaCreationTimeMap = new HashMap<>();
this.specMaps = new HashMap<>();
this.latestSchemaMap = new HashMap<>();
+ this.tableTopicPartitionMap = new HashMap<>();
this.timeOutSeconds =
state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS,
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
if (!state.contains(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY))
{
@@ -169,7 +173,7 @@ public class HiveMetadataWriter implements MetadataWriter {
}
public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
- Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec)
throws IOException {
+ Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec,
String gmceTopicPartition) throws IOException {
String dbName = tableSpec.getTable().getDbName();
String tableName = tableSpec.getTable().getTableName();
String tableKey = tableNameJoiner.join(dbName, tableName);
@@ -185,6 +189,8 @@ public class HiveMetadataWriter implements MetadataWriter {
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
}
+ tableTopicPartitionMap.put(tableKey, gmceTopicPartition);
+
//Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
//todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
String topicName = null;
@@ -366,7 +372,7 @@ public class HiveMetadataWriter implements MetadataWriter {
(GobblinMetadataChangeEvent)
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
try {
- write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+ write(gmce, newSpecsMap, oldSpecsMap, tableSpec,
recordEnvelope.getWatermark().getSource());
} catch (IOException e) {
throw new
HiveMetadataWriterWithPartitionInfoException(getPartitionValues(newSpecsMap),
getPartitionValues(oldSpecsMap), e);
}
@@ -398,6 +404,11 @@ public class HiveMetadataWriter implements MetadataWriter {
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.HIVE_PARTITION_OPERATION_KEY,
operation.name());
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.PARTITION_HDFS_PATH,
hiveSpec.getPath().toString());
+ String gmceTopicPartition =
tableTopicPartitionMap.get(tableNameJoiner.join(dbName, tableName));
+ if (gmceTopicPartition != null) {
+
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.HIVE_EVENT_GMCE_TOPIC_NAME,
gmceTopicPartition.split("-")[0]);
+ }
+
return gobblinTrackingEvent;
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
index aed1366b5..14dff3891 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
@@ -47,6 +47,7 @@ public class MetadataWriterKeys {
public static final String FAILED_TO_DROP_PARTITION_VALUES_KEY =
"failedToDropPartitionValues";
public static final String PARTITION_KEYS = "partitionKeys";
public static final String HIVE_PARTITION_OPERATION_KEY =
"hivePartitionOperation";
+ public static final String HIVE_EVENT_GMCE_TOPIC_NAME = "kafkaTopic";
private MetadataWriterKeys() {
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 31cea7c72..e643b1dbb 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -1082,7 +1082,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
tableCurrentWatermarkMap.put(tid, currentOffset);
} else {
- log.warn(String.format("Skip processing record %s since it has lower
watermark", genericRecord.toString()));
+ log.warn(String.format("Skip processing record for table: %s.%s,
GMCE offset: %d, GMCE partition: %s since it has lower watermark",
+ dbName, tableName, currentOffset, topicPartition));
}
} else {
log.info(String.format("Skip table %s.%s since it's not selected",
tableSpec.getTable().getDbName(),