[
https://issues.apache.org/jira/browse/GOBBLIN-1775?focusedWorklogId=844446&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-844446
]
ASF GitHub Bot logged work on GOBBLIN-1775:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Feb/23 19:57
Start Date: 08/Feb/23 19:57
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3633:
URL: https://github.com/apache/gobblin/pull/3633#discussion_r1100604690
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
i.e. IOExceptions when creating the table
+ * are swallowed and logged to error
+ * @param tableSpec
+ * @param tableKey table key used to check if table is in spec cache
+ * @return if the table the table was created. If the table existed
beforehand, it still returns true.
+ */
+ private boolean createTable(HiveSpec tableSpec, String tableKey) {
+ try {
+ // no-op if it's in spec cache (spec cache contains tablekey for all db
/ tables created since last flush)
+ if (inHiveSpecCache(tableKey)) {
+ return true;
+ }
+
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return topicName;
+ }
+
+ /**
+ * We care about if a table key is in the spec cache because it means that
we have already created this table before
+ * since the last flush. Therefore, we can use this method to check whether
we need to create a table
+ * @param tableKey
+ * @return
+ */
+ private boolean inHiveSpecCache(String tableKey) {
+ return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+ }
+
+ private void updateLatestSchemaMap(String dbName, String tableName, String
tableKey) throws IOException {
+ //ToDo: after making sure all spec has topic.name set, we should use
topicName as key for schema
+ boolean alwaysUseLatestSchema =
useLatestTableSchemaAllowDenyList.acceptTable(dbName, tableName);
Review Comment:
I think to make it more clear, we should set the name to be
alwaysUseExistingSchema, and I would suggest you to change the allowDenyList
accordingly as you touch it now
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +229,69 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
i.e. IOExceptions when creating the table
+ * are swallowed and logged to error
+ * @param tableSpec
+ * @param tableKey table key used to check if table is in spec cache
+ * @return if the table the table was created. If the table existed
beforehand, it still returns true.
+ */
+ private boolean createTable(HiveSpec tableSpec, String tableKey) {
+ try {
+ // no-op if it's in spec cache (spec cache contains tablekey for all db
/ tables created since last flush)
+ if (inHiveSpecCache(tableKey)) {
+ return true;
+ }
+
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return topicName;
+ }
+
+ /**
+ * We care about if a table key is in the spec cache because it means that
we have already created this table before
+ * since the last flush. Therefore, we can use this method to check whether
we need to create a table
+ * @param tableKey
+ * @return
+ */
+ private boolean inHiveSpecCache(String tableKey) {
+ return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+ }
+
+ private void updateLatestSchemaMap(String dbName, String tableName, String
tableKey) throws IOException {
Review Comment:
Change this to be updateLatestSchemaMapWithExistingSchema
Issue Time Tracking
-------------------
Worklog Id: (was: 844446)
Time Spent: 1h (was: 50m)
> Add logic in GMIP to not fail the container when we cannot create table
> -----------------------------------------------------------------------
>
> Key: GOBBLIN-1775
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1775
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)