ZihanLi58 commented on code in PR #3633:
URL: https://github.com/apache/gobblin/pull/3633#discussion_r1100604690


##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
         break;
       }
       default: {
-        log.error("unsupported operation {}", 
gmce.getOperationType().toString());
+        log.error("unsupported operation {}", opType);
         return;
       }
     }
   }
 
+  /**
+   * Helper function to gracefully handle errors when creating a hive table. 
i.e. IOExceptions when creating the table
+   * are swallowed and logged to error
+   * @param tableSpec
+   * @param tableKey table key used to check if table is in spec cache
+   * @return if the table the table was created. If the table existed 
beforehand, it still returns true.
+   */
+  private boolean createTable(HiveSpec tableSpec, String tableKey) {
+    try {
+      // no-op if it's in spec cache (spec cache contains tablekey for all db 
/ tables created since last flush)
+      if (inHiveSpecCache(tableKey)) {
+        return true;
+      }
+
+      this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+      return true;
+    } catch (IOException e) {
+      log.error("Failed to create table. Skipping this event", e);
+      return false;
+    }
+  }
+
+  @Nullable
+  private String getTopicName(GobblinMetadataChangeEvent gmce) {
+    //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
+    //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
+    String topicName = null;
+    if (gmce.getTopicPartitionOffsetsRange() != null && 
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+      String topicPartitionString = 
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+      //In case the topic name is not the table name or the topic name 
contains '-'
+      topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+    }
+    return topicName;
+  }
+
+  /**
+   * We care about if a table key is in the spec cache because it means that 
we have already created this table before
+   * since the last flush. Therefore, we can use this method to check whether 
we need to create a table
+   * @param tableKey
+   * @return
+   */
+  private boolean inHiveSpecCache(String tableKey) {
+    return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+  }
+
+  private void updateLatestSchemaMap(String dbName, String tableName, String 
tableKey) throws IOException {
+    //ToDo: after making sure all spec has topic.name set, we should use 
topicName as key for schema
+    boolean alwaysUseLatestSchema = 
useLatestTableSchemaAllowDenyList.acceptTable(dbName, tableName);

Review Comment:
   I think to make it more clear, we should set the name to be 
alwaysUseExistingSchema, and I would suggest you to change the allowDenyList 
accordingly as you touch it now



##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
         break;
       }
       default: {
-        log.error("unsupported operation {}", 
gmce.getOperationType().toString());
+        log.error("unsupported operation {}", opType);
         return;
       }
     }
   }
 
+  /**
+   * Helper function to gracefully handle errors when creating a hive table. 
i.e. IOExceptions when creating the table
+   * are swallowed and logged to error
+   * @param tableSpec
+   * @param tableKey table key used to check if table is in spec cache
+   * @return if the table the table was created. If the table existed 
beforehand, it still returns true.
+   */
+  private boolean createTable(HiveSpec tableSpec, String tableKey) {
+    try {
+      // no-op if it's in spec cache (spec cache contains tablekey for all db 
/ tables created since last flush)
+      if (inHiveSpecCache(tableKey)) {
+        return true;
+      }
+
+      this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+      return true;
+    } catch (IOException e) {
+      log.error("Failed to create table. Skipping this event", e);
+      return false;
+    }
+  }
+
+  @Nullable
+  private String getTopicName(GobblinMetadataChangeEvent gmce) {
+    //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
+    //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
+    String topicName = null;
+    if (gmce.getTopicPartitionOffsetsRange() != null && 
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+      String topicPartitionString = 
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+      //In case the topic name is not the table name or the topic name 
contains '-'
+      topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+    }
+    return topicName;
+  }
+
+  /**
+   * We care about if a table key is in the spec cache because it means that 
we have already created this table before
+   * since the last flush. Therefore, we can use this method to check whether 
we need to create a table
+   * @param tableKey
+   * @return
+   */
+  private boolean inHiveSpecCache(String tableKey) {
+    return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+  }
+
+  private void updateLatestSchemaMap(String dbName, String tableName, String 
tableKey) throws IOException {

Review Comment:
   Change this to be updateLatestSchemaMapWithExistingSchema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to