ZihanLi58 commented on code in PR #3633:
URL: https://github.com/apache/gobblin/pull/3633#discussion_r1100604690
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
i.e. IOExceptions when creating the table
+ * are swallowed and logged to error
+ * @param tableSpec
+ * @param tableKey table key used to check if table is in spec cache
+ * @return if the table the table was created. If the table existed
beforehand, it still returns true.
+ */
+ private boolean createTable(HiveSpec tableSpec, String tableKey) {
+ try {
+ // no-op if it's in spec cache (spec cache contains tablekey for all db
/ tables created since last flush)
+ if (inHiveSpecCache(tableKey)) {
+ return true;
+ }
+
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return topicName;
+ }
+
+ /**
+ * We care about if a table key is in the spec cache because it means that
we have already created this table before
+ * since the last flush. Therefore, we can use this method to check whether
we need to create a table
+ * @param tableKey
+ * @return
+ */
+ private boolean inHiveSpecCache(String tableKey) {
+ return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+ }
+
+ private void updateLatestSchemaMap(String dbName, String tableName, String
tableKey) throws IOException {
+ //ToDo: after making sure all spec has topic.name set, we should use
topicName as key for schema
+ boolean alwaysUseLatestSchema =
useLatestTableSchemaAllowDenyList.acceptTable(dbName, tableName);
Review Comment:
I think to make it more clear, we should set the name to be
alwaysUseExistingSchema, and I would suggest you to change the allowDenyList
accordingly as you touch it now
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
i.e. IOExceptions when creating the table
+ * are swallowed and logged to error
+ * @param tableSpec
+ * @param tableKey table key used to check if table is in spec cache
+ * @return if the table the table was created. If the table existed
beforehand, it still returns true.
+ */
+ private boolean createTable(HiveSpec tableSpec, String tableKey) {
+ try {
+ // no-op if it's in spec cache (spec cache contains tablekey for all db
/ tables created since last flush)
+ if (inHiveSpecCache(tableKey)) {
+ return true;
+ }
+
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return topicName;
+ }
+
+ /**
+ * We care about if a table key is in the spec cache because it means that
we have already created this table before
+ * since the last flush. Therefore, we can use this method to check whether
we need to create a table
+ * @param tableKey
+ * @return
+ */
+ private boolean inHiveSpecCache(String tableKey) {
+ return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+ }
+
+ private void updateLatestSchemaMap(String dbName, String tableName, String
tableKey) throws IOException {
Review Comment:
Change this to be updateLatestSchemaMapWithExistingSchema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]