jack-moseley commented on a change in pull request #3296:
URL: https://github.com/apache/gobblin/pull/3296#discussion_r652298706



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/DefaultIssueRefinery.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.google.common.collect.ImmutableList;
+
+
+/**
+ * Provides basic issue sorting and cleanup.
+ *
+ * Issue refining will be improved in future PRs
+ */
+public class DefaultIssueRefinery implements IssueRefinery {
+  @Override
+  public List<Issue> refine(ImmutableList<Issue> sourceIssues) {
+    LinkedList<Issue> issues = new LinkedList<>(sourceIssues);
+
+    // Kafka warnings usually don't impact the job outcome, so we are hiding 
them from the user
+    issues.removeIf(i -> 
i.getSeverity().isEqualOrLowerThan(IssueSeverity.WARN) && StringUtils
+        .containsIgnoreCase(i.getSourceClass(), "com.linkedin.kafka"));
+    issues.removeIf(i -> 
i.getSeverity().isEqualOrLowerThan(IssueSeverity.WARN) && StringUtils
+        .containsIgnoreCase(i.getSourceClass(), "org.apache.kafka"));
+
+    moveToBottom(issues, i -> 
StringUtils.containsIgnoreCase(i.getSourceClass(), 
"org.apache.gobblin.metrics"));

Review comment:
       Let's include a comment explaining each rule in the issue refinery, I'm 
assuming this one is because metrics related issues are often non-fatal.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.text.TextStringBuilder;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Automatic troubleshooter will identify and prioritize the problems with the 
job, and display a summary to the user.
+ *
+ * Troubleshooter will collect errors & warnings from logs and combine them 
with various health checks. After that
+ * you can {@link #refineIssues()} to prioritize them and filter out noise, 
and then {@link #logIssueSummary()}
+ * to show a human-readable list of issues.
+ *
+ * Implementation and architecture notes:
+ *
+ * We convert log messages and health check results to {@link Issue}s. They 
will be shown to the user at the end of
+ * the job log. To avoid overwhelming the user, we will only collect a fixed 
number of issues, and will de-duplicate
+ * them, so that each type of problem is shown only once.
+ *
+ * Issues will be emitted in GobblinTrackingEvents at the end of the job, so 
that they can be collected by Gobblin
+ * service, and used for future platform-wide analysis.
+ *
+ * */
+@Slf4j
+@Singleton
+public class AutomaticTroubleshooter extends AbstractIdleService {
+
+  private final IssueRepository issueRepository;
+  private AutoTroubleshooterLogAppender troubleshooterLogger;
+
+  @Inject
+  public AutomaticTroubleshooter(IssueRepository issueRepository) {
+    this.issueRepository = Objects.requireNonNull(issueRepository);
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+    setupLogAppender();
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+    removeLogAppender();
+  }
+
+  private void setupLogAppender() {
+    org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
+
+    troubleshooterLogger = new AutoTroubleshooterLogAppender(issueRepository);
+    troubleshooterLogger.setThreshold(Level.WARN);
+    troubleshooterLogger.activateOptions();
+    rootLogger.addAppender(troubleshooterLogger);

Review comment:
       Am I understanding this correctly that this logger will essentially 
convert every log line of level WARN or higher (even if there is no stacktrace) 
to an Issue in the repository?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.text.TextStringBuilder;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Automatic troubleshooter will identify and prioritize the problems with the 
job, and display a summary to the user.
+ *
+ * Troubleshooter will collect errors & warnings from logs and combine them 
with various health checks. After that
+ * you can {@link #refineIssues()} to prioritize them and filter out noise, 
and then {@link #logIssueSummary()}
+ * to show a human-readable list of issues.
+ *
+ * Implementation and architecture notes:
+ *
+ * We convert log messages and health check results to {@link Issue}s. They 
will be shown to the user at the end of
+ * the job log. To avoid overwhelming the user, we will only collect a fixed 
number of issues, and will de-duplicate
+ * them, so that each type of problem is shown only once.
+ *
+ * Issues will be emitted in GobblinTrackingEvents at the end of the job, so 
that they can be collected by Gobblin
+ * service, and used for future platform-wide analysis.
+ *
+ * */
+@Slf4j
+@Singleton
+public class AutomaticTroubleshooter extends AbstractIdleService {
+
+  private final IssueRepository issueRepository;
+  private AutoTroubleshooterLogAppender troubleshooterLogger;
+
+  @Inject
+  public AutomaticTroubleshooter(IssueRepository issueRepository) {
+    this.issueRepository = Objects.requireNonNull(issueRepository);
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+    setupLogAppender();
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+    removeLogAppender();
+  }
+
+  private void setupLogAppender() {
+    org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
+
+    troubleshooterLogger = new AutoTroubleshooterLogAppender(issueRepository);
+    troubleshooterLogger.setThreshold(Level.WARN);
+    troubleshooterLogger.activateOptions();
+    rootLogger.addAppender(troubleshooterLogger);
+
+    log.info("Configured logger for automatic troubleshooting");
+  }
+
+  private void removeLogAppender() {
+    org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.removeAppender(troubleshooterLogger);
+    log.info("Removed logger for automatic troubleshooting. Processed {} 
events.",
+             troubleshooterLogger.getProcessedEventCount());
+  }
+
+  /**
+   * This method will sort, filter and enhance the list of issues to make it 
more meaningful for the user.
+   */
+  public void refineIssues()
+      throws TroubleshooterException {
+    IssueRefinery refinery = new DefaultIssueRefinery();

Review comment:
       Should there be a way to use a type of refinery other than 
`DefaultIssueRefinery`? Maybe passing through the constructor similar to the 
`IssueRepository`.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRJobLauncher.java
##########
@@ -737,11 +738,19 @@ private static FileSystem buildFileSystem(Properties 
jobProps, Configuration con
     // A list of WorkUnits (flattened for MultiWorkUnits) to be run by this 
mapper
     private final List<WorkUnit> workUnits = Lists.newArrayList();
 
+    private AutomaticTroubleshooter troubleshooter;
+    private IssueRepository issueRepository;
+
     @Override
     protected void setup(Context context) {
       final State gobblinJobState = 
HadoopUtils.getStateFromConf(context.getConfiguration());
       TaskAttemptID taskAttemptID = context.getTaskAttemptID();
 
+      issueRepository = new InMemoryIssueRepository();
+      troubleshooter = new AutomaticTroubleshooter(issueRepository);
+
+      troubleshooter.startAsync().awaitRunning();

Review comment:
       This one should probably have a 1 minute timeout as well to be 
consistent with the other starts/stops of the troubleshooter.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.text.TextStringBuilder;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Automatic troubleshooter will identify and prioritize the problems with the 
job, and display a summary to the user.
+ *
+ * Troubleshooter will collect errors & warnings from logs and combine them 
with various health checks. After that
+ * you can {@link #refineIssues()} to prioritize them and filter out noise, 
and then {@link #logIssueSummary()}
+ * to show a human-readable list of issues.
+ *
+ * Implementation and architecture notes:
+ *
+ * We convert log messages and health check results to {@link Issue}s. They 
will be shown to the user at the end of
+ * the job log. To avoid overwhelming the user, we will only collect a fixed 
number of issues, and will de-duplicate
+ * them, so that each type of problem is shown only once.
+ *
+ * Issues will be emitted in GobblinTrackingEvents at the end of the job, so 
that they can be collected by Gobblin
+ * service, and used for future platform-wide analysis.
+ *
+ * */
+@Slf4j
+@Singleton
+public class AutomaticTroubleshooter extends AbstractIdleService {
+
+  private final IssueRepository issueRepository;
+  private AutoTroubleshooterLogAppender troubleshooterLogger;
+
+  @Inject
+  public AutomaticTroubleshooter(IssueRepository issueRepository) {
+    this.issueRepository = Objects.requireNonNull(issueRepository);
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+    setupLogAppender();
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+    removeLogAppender();
+  }

Review comment:
       I'm not sure I understand why `AutomaticTroubleshooter` is an async 
service, couldn't we just directly call the methods to setup and remove the log 
appender? It doesn't actually do anything asynchronously does it?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/troubleshooter/AutomaticTroubleshooter.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.troubleshooter;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.commons.text.TextStringBuilder;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Automatic troubleshooter will identify and prioritize the problems with the 
job, and display a summary to the user.
+ *
+ * Troubleshooter will collect errors & warnings from logs and combine them 
with various health checks. After that
+ * you can {@link #refineIssues()} to prioritize them and filter out noise, 
and then {@link #logIssueSummary()}
+ * to show a human-readable list of issues.
+ *
+ * Implementation and architecture notes:
+ *
+ * We convert log messages and health check results to {@link Issue}s. They 
will be shown to the user at the end of
+ * the job log. To avoid overwhelming the user, we will only collect a fixed 
number of issues, and will de-duplicate
+ * them, so that each type of problem is shown only once.
+ *
+ * Issues will be emitted in GobblinTrackingEvents at the end of the job, so 
that they can be collected by Gobblin
+ * service, and used for future platform-wide analysis.
+ *
+ * */
+@Slf4j
+@Singleton
+public class AutomaticTroubleshooter extends AbstractIdleService {
+
+  private final IssueRepository issueRepository;
+  private AutoTroubleshooterLogAppender troubleshooterLogger;
+
+  @Inject
+  public AutomaticTroubleshooter(IssueRepository issueRepository) {
+    this.issueRepository = Objects.requireNonNull(issueRepository);
+  }
+
+  @Override
+  protected void startUp()
+      throws Exception {
+    setupLogAppender();
+  }
+
+  @Override
+  protected void shutDown()
+      throws Exception {
+    removeLogAppender();
+  }
+
+  private void setupLogAppender() {
+    org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
+
+    troubleshooterLogger = new AutoTroubleshooterLogAppender(issueRepository);
+    troubleshooterLogger.setThreshold(Level.WARN);
+    troubleshooterLogger.activateOptions();
+    rootLogger.addAppender(troubleshooterLogger);
+
+    log.info("Configured logger for automatic troubleshooting");
+  }
+
+  private void removeLogAppender() {
+    org.apache.log4j.Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.removeAppender(troubleshooterLogger);
+    log.info("Removed logger for automatic troubleshooting. Processed {} 
events.",
+             troubleshooterLogger.getProcessedEventCount());
+  }
+
+  /**
+   * This method will sort, filter and enhance the list of issues to make it 
more meaningful for the user.
+   */
+  public void refineIssues()
+      throws TroubleshooterException {
+    IssueRefinery refinery = new DefaultIssueRefinery();
+
+    List<Issue> issues = issueRepository.getAll();
+    
issues.sort(Comparator.comparing(Issue::getSeverity).reversed().thenComparing(Issue::getTime));

Review comment:
       Maybe this sorting logic should be included in the issue refinery 
instead of here, since sorting/refining seems to be the job of the issue 
refinery.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -189,6 +197,10 @@ public AbstractJobLauncher(Properties jobProps, List<? 
extends Tag<?>> metadataT
     clusterNameTags.addAll(Tag.fromMap(ClusterNameTags.getClusterNameTags()));
     GobblinMetrics.addCustomTagsToProperties(jobProps, clusterNameTags);
 
+    issueRepository = new InMemoryIssueRepository();
+    troubleshooter = new AutomaticTroubleshooter(issueRepository);

Review comment:
       Instead of enabling this in general for all gobblin jobs, should we 
consider making this troubleshooter opt-in to start with, or at least have a 
config to disable it? Just in case there are unexpected performance issues, it 
floods logs too much, it's sending too many events, etc.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ThrowableWithErrorCode.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+public interface ThrowableWithErrorCode {
+
+  /**
+   * Returns an error code that uniquely identifies the problem in the given 
context.
+   *
+   * The error code will be used programmatically, to take different recovery 
actions.
+   * */
+  String getErrorCode();

Review comment:
       Maybe in the javadoc include a couple of example error codes, from this 
it's not quite clear if an error code would be just a number, a single word, a 
short description of the issue, etc. I guess any of those would work the same, 
but would probably be good to have a standard.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to