This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d90d1961b Add GMCE topic explicitly to hive commit event (#3547)
d90d1961b is described below

commit d90d1961bbb8242a45266ec02eb0f27da5735c14
Author: Jack Moseley <[email protected]>
AuthorDate: Wed Aug 31 13:36:29 2022 -0700

    Add GMCE topic explicitly to hive commit event (#3547)
---
 .../apache/gobblin/hive/writer/HiveMetadataWriter.java    | 15 +++++++++++++--
 .../apache/gobblin/hive/writer/MetadataWriterKeys.java    |  1 +
 .../gobblin/iceberg/writer/IcebergMetadataWriter.java     |  3 ++-
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 84f805eb9..730210159 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -96,6 +96,9 @@ public class HiveMetadataWriter implements MetadataWriter {
   /* Mapping from tableIdentifier to a cache, key'ed by a list of partitions 
with value as HiveSpec object. */
   private final HashMap<String, Cache<List<String>, HiveSpec>> specMaps;
 
+  // Used to store the relationship between table and the gmce topicPartition
+  private final HashMap<String, String> tableTopicPartitionMap;
+
   /* Mapping from tableIdentifier to latest schema observed. */
   private final HashMap<String, String> latestSchemaMap;
   private final long timeOutSeconds;
@@ -116,6 +119,7 @@ public class HiveMetadataWriter implements MetadataWriter {
     this.schemaCreationTimeMap = new HashMap<>();
     this.specMaps = new HashMap<>();
     this.latestSchemaMap = new HashMap<>();
+    this.tableTopicPartitionMap = new HashMap<>();
     this.timeOutSeconds =
         state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, 
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
     if (!state.contains(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY)) 
{
@@ -169,7 +173,7 @@ public class HiveMetadataWriter implements MetadataWriter {
   }
 
   public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
-      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec) 
throws IOException {
+      Map<String, Collection<HiveSpec>> oldSpecsMap, HiveSpec tableSpec, 
String gmceTopicPartition) throws IOException {
     String dbName = tableSpec.getTable().getDbName();
     String tableName = tableSpec.getTable().getTableName();
     String tableKey = tableNameJoiner.join(dbName, tableName);
@@ -185,6 +189,8 @@ public class HiveMetadataWriter implements MetadataWriter {
           
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
     }
 
+    tableTopicPartitionMap.put(tableKey, gmceTopicPartition);
+
     //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
     //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
     String topicName = null;
@@ -366,7 +372,7 @@ public class HiveMetadataWriter implements MetadataWriter {
         (GobblinMetadataChangeEvent) 
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
     if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(), 
tableSpec.getTable().getTableName())) {
       try {
-        write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
+        write(gmce, newSpecsMap, oldSpecsMap, tableSpec, 
recordEnvelope.getWatermark().getSource());
       } catch (IOException e) {
         throw new 
HiveMetadataWriterWithPartitionInfoException(getPartitionValues(newSpecsMap), 
getPartitionValues(oldSpecsMap), e);
       }
@@ -398,6 +404,11 @@ public class HiveMetadataWriter implements MetadataWriter {
     
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.HIVE_PARTITION_OPERATION_KEY,
 operation.name());
     gobblinTrackingEvent.addMetadata(MetadataWriterKeys.PARTITION_HDFS_PATH, 
hiveSpec.getPath().toString());
 
+    String gmceTopicPartition = 
tableTopicPartitionMap.get(tableNameJoiner.join(dbName, tableName));
+    if (gmceTopicPartition != null) {
+      
gobblinTrackingEvent.addMetadata(MetadataWriterKeys.HIVE_EVENT_GMCE_TOPIC_NAME, 
gmceTopicPartition.split("-")[0]);
+    }
+
     return gobblinTrackingEvent;
   }
 
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
index aed1366b5..14dff3891 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/MetadataWriterKeys.java
@@ -47,6 +47,7 @@ public class MetadataWriterKeys {
   public static final String FAILED_TO_DROP_PARTITION_VALUES_KEY = 
"failedToDropPartitionValues";
   public static final String PARTITION_KEYS = "partitionKeys";
   public static final String HIVE_PARTITION_OPERATION_KEY = 
"hivePartitionOperation";
+  public static final String HIVE_EVENT_GMCE_TOPIC_NAME = "kafkaTopic";
 
   private MetadataWriterKeys() {
   }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 31cea7c72..e643b1dbb 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -1082,7 +1082,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
           write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
           tableCurrentWatermarkMap.put(tid, currentOffset);
         } else {
-          log.warn(String.format("Skip processing record %s since it has lower 
watermark", genericRecord.toString()));
+          log.warn(String.format("Skip processing record for table: %s.%s, 
GMCE offset: %d, GMCE partition: %s since it has lower watermark",
+              dbName, tableName, currentOffset, topicPartition));
         }
       } else {
         log.info(String.format("Skip table %s.%s since it's not selected", 
tableSpec.getTable().getDbName(),

Reply via email to