This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3cc50cd [GOBBLIN-1272] Fixing bug in loading config-store when the
entry is empty
3cc50cd is described below
commit 3cc50cd9b2576fbfff5e58c8668997f57c2864bc
Author: Lei Sun <[email protected]>
AuthorDate: Tue Sep 29 13:55:07 2020 -0700
[GOBBLIN-1272] Fixing bug in loading config-store when the entry is empty
An attempt to fix string loading problem
Add a simple unit test
Address comments and remove unnecessary code
Remove unused imports
Closes #3112 from
autumnust/fixStringLoadingFromGCS
---
.../hive/policy/HiveRegistrationPolicyBase.java | 7 +++----
.../extractor/extract/kafka/ConfigStoreUtils.java | 22 ++++++++--------------
.../extract/kafka/ConfigStoreUtilsTest.java | 7 ++++---
3 files changed, 15 insertions(+), 21 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index a0270ca..525a0be 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.util.ConfigUtils;
/**
@@ -178,8 +179,7 @@ public class HiveRegistrationPolicyBase implements
HiveRegistrationPolicy {
}
if (configForTopic.isPresent() &&
configForTopic.get().hasPath(ADDITIONAL_HIVE_DATABASE_NAMES)) {
- databaseNames.addAll(
-
ConfigStoreUtils.getListOfValuesFromConfigStore(configForTopic.get(),
ADDITIONAL_HIVE_DATABASE_NAMES).stream()
+ databaseNames.addAll(ConfigUtils.getStringList(configForTopic.get(),
ADDITIONAL_HIVE_DATABASE_NAMES).stream()
.map(x -> this.dbNamePrefix + x +
this.dbNameSuffix).collect(Collectors.toList()));
} else if
(!Strings.isNullOrEmpty(this.props.getProp(ADDITIONAL_HIVE_DATABASE_NAMES))) {
for (String additionalDbName :
this.props.getPropAsList(ADDITIONAL_HIVE_DATABASE_NAMES)) {
@@ -261,8 +261,7 @@ public class HiveRegistrationPolicyBase implements
HiveRegistrationPolicy {
// Searching additional table name from ConfigStore-returned object.
if (primaryTableName.isPresent() && configForTopic.isPresent() &&
configForTopic.get().hasPath(additionalNamesProp)) {
- for (String additionalTableName : ConfigStoreUtils
- .getListOfValuesFromConfigStore(configForTopic.get(),
additionalNamesProp)) {
+ for (String additionalTableName :
ConfigUtils.getStringList(configForTopic.get(), additionalNamesProp)) {
String resolvedTableName =
StringUtils.replace(additionalTableName, PRIMARY_TABLE_TOKEN,
primaryTableName.get());
tableNames.add(this.tableNamePrefix + resolvedTableName +
this.tableNameSuffix);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
index 8768110..1806e3c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtils.java
@@ -26,6 +26,14 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.ConfigClientUtils;
import
org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
@@ -37,14 +45,6 @@ import
org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
import org.apache.gobblin.util.PathUtils;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.typesafe.config.Config;
-
-import lombok.extern.slf4j.Slf4j;
@Slf4j
@@ -258,10 +258,4 @@ public class ConfigStoreUtils {
log.warn("None of the blacklist or whitelist tags are provided");
}
}
-
- public static List<String> getListOfValuesFromConfigStore(Config config,
String keyValue) {
- return Splitter.on(",")
- .trimResults()
- .splitToList(config.getString(keyValue));
- }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
index d296937..d4d5232 100644
---
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/ConfigStoreUtilsTest.java
@@ -24,9 +24,6 @@ import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
-import org.apache.gobblin.config.client.ConfigClient;
-import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.hadoop.fs.Path;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -38,6 +35,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+
import static
org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_ENABLED;
import static
org.apache.gobblin.configuration.ConfigurationKeys.CONFIG_MANAGEMENT_STORE_URI;
import static
org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils.GOBBLIN_CONFIG_COMMONPATH;