This is an automated email from the ASF dual-hosted git repository. lesun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gobblin.git
commit 2c314647f803ef7d91b90d276199e7b58d5f8d7d Author: Zihan Li <[email protected]> AuthorDate: Tue May 11 14:59:06 2021 -0700 [GOBBLIN-1442]Fix the bug of NoSuchElement Exception in HiveWriter --- .../gobblin/hive/writer/HiveMetadataWriter.java | 29 +++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java index a4550bc..b11948c 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java @@ -95,20 +95,21 @@ public class HiveMetadataWriter implements MetadataWriter { @Override public void flush(String dbName, String tableName) throws IOException { String tableKey = tableNameJoiner.join(dbName, tableName); - log.info("start to flush table: " + tableKey); - HashMap<List<String>, ListenableFuture<Void>> executionMap = - this.currentExecutionMap.computeIfAbsent(tableKey, s -> new HashMap<>()); - //iterator all execution to get the result to make sure they all succeeded - for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) { - try { - execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.error("Error when getting the result of registration for table" + tableKey); - throw new RuntimeException(e); + if(this.currentExecutionMap.containsKey(tableKey)) { + log.info("start to flush table: " + tableKey); + HashMap<List<String>, ListenableFuture<Void>> executionMap = this.currentExecutionMap.get(tableKey); + //iterator all execution to get the result to make sure they all succeeded + for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) { + try { + execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Error when getting the result of registration for table" + tableKey); + throw new RuntimeException(e); + } } + executionMap.clear(); + log.info("finish flushing table: " + tableKey); } - executionMap.clear(); - log.info("finish flushing table: " + tableKey); } public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, @@ -131,7 +132,7 @@ public class HiveMetadataWriter implements MetadataWriter { //Calculate the topic name from gmce, fall back to topic.name in hive spec which can also be null //todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation String topicName = null; - if (gmce.getTopicPartitionOffsetsRange() != null) { + if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) { String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next(); //In case the topic name is not the table name or the topic name contains '-' topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); @@ -278,7 +279,7 @@ public class HiveMetadataWriter implements MetadataWriter { if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) { write(gmce, newSpecsMap, oldSpecsMap, tableSpec); } else { - log.info(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(), + log.debug(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())); } }
