homatthew commented on code in PR #3633:
URL: https://github.com/apache/gobblin/pull/3633#discussion_r1093517280


##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +227,63 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
         break;
       }
       default: {
-        log.error("unsupported operation {}", 
gmce.getOperationType().toString());
+        log.error("unsupported operation {}", opType);
         return;
       }
     }
   }
 
+  /**
+   * Helper function to gracefully handle errors when creating a hive table. 
Existence is not an error.
+   * IOExceptions when creating the table are swallowed and logged to error
+   * @param tableSpec
+   * @return true if the table the table was created without error. Existence 
of the table is not an error
+   */
+  private boolean createTable(HiveSpec tableSpec) {
+    try {
+      this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+      return true;
+    } catch (IOException e) {
+      log.error("Failed to create table. Skipping this event", e);
+      return false;
+    }
+  }
+
+  @Nullable
+  private String getTopicName(GobblinMetadataChangeEvent gmce) {
+    //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
+    //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation

Review Comment:
   pre existing comments / TODOS



##########
gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java:
##########
@@ -232,12 +227,63 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
         break;
       }
       default: {
-        log.error("unsupported operation {}", 
gmce.getOperationType().toString());
+        log.error("unsupported operation {}", opType);
         return;
       }
     }
   }
 
+  /**
+   * Helper function to gracefully handle errors when creating a hive table. 
Existence is not an error.
+   * IOExceptions when creating the table are swallowed and logged to error
+   * @param tableSpec
+   * @return true if the table the table was created without error. Existence 
of the table is not an error
+   */
+  private boolean createTable(HiveSpec tableSpec) {
+    try {
+      this.hiveRegister.createTableIfNotExists(tableSpec.getTable());
+      return true;
+    } catch (IOException e) {
+      log.error("Failed to create table. Skipping this event", e);
+      return false;
+    }
+  }
+
+  @Nullable
+  private String getTopicName(GobblinMetadataChangeEvent gmce) {
+    //Calculate the topic name from gmce, fall back to topic.name in hive spec 
which can also be null
+    //todo: make topicName fall back to topic.name in hive spec so that we can 
also get schema for re-write operation
+    String topicName = null;
+    if (gmce.getTopicPartitionOffsetsRange() != null && 
!gmce.getTopicPartitionOffsetsRange().isEmpty()) {
+      String topicPartitionString = 
gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
+      //In case the topic name is not the table name or the topic name 
contains '-'
+      topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+    }
+    return topicName;
+  }
+
+  /**
+   * We care about if a table key is in the spec cache because it means that 
we have already created this table before
+   * since the last flush. Therefore, we can use this method to check whether 
we need to create a table
+   * @param tableKey
+   * @return
+   */
+  private boolean inHiveSpecCache(String tableKey) {
+    return specMaps.containsKey(tableKey) && specMaps.get(tableKey).size() > 0;
+  }
+
+  private void updateLatestSchemaMap(String dbName, String tableName, String 
tableKey) throws IOException {
+    //ToDo: after making sure all spec has topic.name set, we should use 
topicName as key for schema

Review Comment:
   pre existing comments / TODOS



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to