This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2e97f5661 [GOBBLIN-2078] remove configs because they may have a config
that is prefix of another config (#3961)
2e97f5661 is described below
commit 2e97f5661b665b6e8e2ea889742682cae5706d1b
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Mon Jun 3 17:24:05 2024 -0700
[GOBBLIN-2078] remove configs because they may have a config that is prefix
of another config (#3961)
---
.../troubleshooter/AutomaticTroubleshooterTest.java | 11 +++--------
.../apache/gobblin/runtime/AbstractJobLauncher.java | 6 +++---
.../gobblin/runtime/mapreduce/MRJobLauncher.java | 3 +--
.../troubleshooter/AutomaticTroubleshooterConfig.java | 19 +++++++++----------
.../AutomaticTroubleshooterFactory.java | 8 ++++----
.../AutomaticTroubleshooterFactoryTest.java | 8 ++------
.../ddm/activity/impl/CommitActivityImpl.java | 15 +++++++--------
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 12 +++++-------
.../ddm/activity/impl/ProcessWorkUnitImpl.java | 3 +--
9 files changed, 35 insertions(+), 50 deletions(-)
diff --git
a/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java
b/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java
index 331d253b5..66c21a872 100644
---
a/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java
+++
b/gobblin-modules/gobblin-troubleshooter/src/test/java/org/apache/gobblin/troubleshooter/AutomaticTroubleshooterTest.java
@@ -28,7 +28,6 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
-import org.apache.gobblin.util.ConfigUtils;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
@@ -41,9 +40,7 @@ public class AutomaticTroubleshooterTest {
@Test
public void canCollectAndRefineIssues()
throws Exception {
- Properties properties = new Properties();
- AutomaticTroubleshooter troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(new Properties());
try {
troubleshooter.start();
log.warn("Test warning");
@@ -72,8 +69,7 @@ public class AutomaticTroubleshooterTest {
throws Exception {
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true");
- AutomaticTroubleshooter troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(properties);
try {
troubleshooter.start();
log.warn("Test warning");
@@ -96,8 +92,7 @@ public class AutomaticTroubleshooterTest {
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false");
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING,
"true");
- AutomaticTroubleshooter troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(properties);
try {
troubleshooter.start();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index e0078803c..fd5c9bab6 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -31,9 +31,8 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-
import java.util.concurrent.atomic.AtomicInteger;
-import lombok.Getter;
+
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +56,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import javax.annotation.Nullable;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -205,7 +205,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags);
- troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
+ troubleshooter = AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();
// Make a copy for both the system and job configuration properties and
resolve the job-template if any.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
index f1c1fa319..7b01dc1a6 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
@@ -779,8 +779,7 @@ public class MRJobLauncher extends AbstractJobLauncher {
final State gobblinJobState =
HadoopUtils.getStateFromConf(context.getConfiguration());
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
- troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(gobblinJobState.getProperties()));
+ troubleshooter =
AutomaticTroubleshooterFactory.createForJob(gobblinJobState.getProperties());
troubleshooter.start();
try (Closer closer = Closer.create()) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java
index 43af63328..d0c95acd0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterConfig.java
@@ -18,8 +18,7 @@
package org.apache.gobblin.runtime.troubleshooter;
import java.util.Objects;
-
-import com.typesafe.config.Config;
+import java.util.Properties;
import javax.inject.Inject;
import lombok.AllArgsConstructor;
@@ -27,7 +26,7 @@ import lombok.Builder;
import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PropertiesUtils;
/**
@@ -44,15 +43,15 @@ public class AutomaticTroubleshooterConfig {
private int inMemoryRepositoryMaxSize =
ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE;
@Inject
- public AutomaticTroubleshooterConfig(Config config) {
- Objects.requireNonNull(config, "Config cannot be null");
+ public AutomaticTroubleshooterConfig(Properties properties) {
+ Objects.requireNonNull(properties, "Properties cannot be null");
- disabled = ConfigUtils.getBoolean(config,
ConfigurationKeys.TROUBLESHOOTER_DISABLED, false);
- disableEventReporting =
- ConfigUtils.getBoolean(config,
ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, false);
+ disabled = PropertiesUtils.getPropAsBoolean(properties,
ConfigurationKeys.TROUBLESHOOTER_DISABLED, "false");
+ disableEventReporting = PropertiesUtils.getPropAsBoolean(properties,
+ ConfigurationKeys.TROUBLESHOOTER_DISABLE_EVENT_REPORTING, "false");
- inMemoryRepositoryMaxSize = ConfigUtils
- .getInt(config,
ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE,
+ inMemoryRepositoryMaxSize = PropertiesUtils.getPropAsInt(properties,
+ ConfigurationKeys.TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE,
ConfigurationKeys.DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE);
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java
index 6d6080e2b..9c4925de9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactory.java
@@ -17,7 +17,7 @@
package org.apache.gobblin.runtime.troubleshooter;
-import com.typesafe.config.Config;
+import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@@ -37,10 +37,10 @@ public class AutomaticTroubleshooterFactory {
* If this module is missing, troubleshooter will default to a no-op
implementation.
*
* In addition, even when the "gobblin-troubleshooter" module is present,
troubleshooter can still be disabled
- * with {@link ConfigurationKeys.TROUBLESHOOTER_DISABLED} setting.
+ * with {@link ConfigurationKeys#TROUBLESHOOTER_DISABLED} setting.
* */
- public static AutomaticTroubleshooter createForJob(Config config) {
- AutomaticTroubleshooterConfig troubleshooterConfig = new
AutomaticTroubleshooterConfig(config);
+ public static AutomaticTroubleshooter createForJob(Properties properties) {
+ AutomaticTroubleshooterConfig troubleshooterConfig = new
AutomaticTroubleshooterConfig(properties);
Class troubleshooterClass = tryGetTroubleshooterClass();
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java
index 0aa7d20e8..fc23c8ed6 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooterFactoryTest.java
@@ -22,7 +22,6 @@ import java.util.Properties;
import org.testng.annotations.Test;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.util.ConfigUtils;
import static org.junit.Assert.assertTrue;
@@ -33,9 +32,7 @@ public class AutomaticTroubleshooterFactoryTest {
public void willGetNoopTroubleshooterByDefault() {
// This test project does not reference gobblin-troubleshooter module, so
we should get a noop-instance
// of troubleshooter. See the main AutomaticTroubleshooterFactory class
for details.
- Properties properties = new Properties();
- AutomaticTroubleshooter troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(new Properties());
assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter);
}
@@ -44,8 +41,7 @@ public class AutomaticTroubleshooterFactoryTest {
public void willGetNoopTroubleshooterWhenDisabled() {
Properties properties = new Properties();
properties.put(ConfigurationKeys.TROUBLESHOOTER_DISABLED, "true");
- AutomaticTroubleshooter troubleshooter =
-
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(properties));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(properties);
assertTrue(troubleshooter instanceof NoopAutomaticTroubleshooter);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index f409e5108..e8490d714 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -29,18 +29,18 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import io.temporal.failure.ApplicationFailure;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import io.temporal.failure.ApplicationFailure;
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -54,16 +54,15 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
-import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -87,7 +86,7 @@ public class CommitActivityImpl implements CommitActivity {
JobState jobState = Help.loadJobState(workSpec, fs);
optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
- troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
+ troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState,
numDeserializationThreads);
if (taskStates.isEmpty()) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index ace8bedc1..0ed0675f6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -21,16 +21,15 @@ import java.io.IOException;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.google.api.client.util.Lists;
import com.google.common.io.Closer;
import io.temporal.failure.ApplicationFailure;
-
import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
@@ -45,10 +44,9 @@ import
org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
-import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -70,7 +68,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
- AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobProps));
+ AutomaticTroubleshooter troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobProps);
troubleshooter.start();
try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
index f11ac7068..a6753245b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java
@@ -56,7 +56,6 @@ import
org.apache.gobblin.temporal.ddm.activity.ProcessWorkUnit;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
@@ -76,7 +75,7 @@ public class ProcessWorkUnitImpl implements ProcessWorkUnit {
List<WorkUnit> workUnits = loadFlattenedWorkUnits(wu, fs);
log.info("{} - loaded; found {} workUnits", correlator,
workUnits.size());
JobState jobState = Help.loadJobState(wu, fs);
- troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
+ troubleshooter =
AutomaticTroubleshooterFactory.createForJob(jobState.getProperties());
troubleshooter.start();
return execute(workUnits, wu, jobState, fs,
troubleshooter.getIssueRepository());
} catch (IOException | InterruptedException e) {