homatthew commented on code in PR #3633:
URL: https://github.com/apache/gobblin/pull/3633#discussion_r1093517280
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +227,63 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
Existence is not an error.
+ * IOExceptions when creating the table are swallowed and logged to error
+ * @param tableSpec
+ * @return true if the table the table was created without error. Existence
of the table is not an error
+ */
+ private boolean createTable(HiveSpec tableSpec) {
+ try {
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
Review Comment:
pre existing comments / TODOS
##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +227,63 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
break;
}
default: {
- log.error("unsupported operation {}",
gmce.getOperationType().toString());
+ log.error("unsupported operation {}", opType);
return;
}
}
}
+ /**
+ * Helper function to gracefully handle errors when creating a hive table.
Existence is not an error.
+ * IOExceptions when creating the table are swallowed and logged to error
+ * @param tableSpec
+ * @return true if the table the table was created without error. Existence
of the table is not an error
+ */
+ private boolean createTable(HiveSpec tableSpec) {
+ try {
+ this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+ return true;
+ } catch (IOException e) {
+ log.error("Failed to create table. Skipping this event", e);
+ return false;
+ }
+ }
+
+ @Nullable
+ private String getTopicName(GobblinMetadataChangeEvent gmce) {
+ //Calculate the topic name from gmce, fall back to topic.name in hive spec
which can also be null
+ //todo: make topicName fall back to topic.name in hive spec so that we can
also get schema for re-write operation
+ String topicName = null;
+ if (gmce.getTopicPartitionOffsetsRange() != null &&
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+ String topicPartitionString =
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return topicName;
+ }
+
+ /**
+ * We care about if a table key is in the spec cache because it means that
we have already created this table before
+ * since the last flush. Therefore, we can use this method to check whether
we need to create a table
+ * @param tableKey
+ * @return
+ */
+ private boolean inHiveSpecCache(String tableKey) {
+ return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+ }
+
+ private void updateLatestSchemaMap(String dbName, String tableName, String
tableKey) throws IOException {
+ //ToDo: after making sure all spec has topic.name set, we should use
topicName as key for schema
Review Comment:
pre existing comments / TODOS
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]