Repository: incubator-gobblin
Updated Branches:
  refs/heads/master b89706257 -> 707c1e4e5


[GOBBLIN-256] Improve logging for gobblin compaction

Closes #2108 from yukuai518/loggin


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/707c1e4e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/707c1e4e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/707c1e4e

Branch: refs/heads/master
Commit: 707c1e4e522aebef91cc2a8aa8d12787f2738e50
Parents: b897062
Author: Kuai Yu <[email protected]>
Authored: Tue Sep 19 17:04:38 2017 -0700
Committer: Hung Tran <[email protected]>
Committed: Tue Sep 19 17:04:38 2017 -0700

----------------------------------------------------------------------
 .../compaction/mapreduce/MRCompactionTask.java  |  2 +-
 .../compaction/source/CompactionFailedTask.java |  5 ++-
 .../compaction/source/CompactionSource.java     | 31 ++++++++++++++----
 .../verify/CompactionAuditCountVerifier.java    | 34 +++++++++-----------
 .../verify/CompactionThresholdVerifier.java     | 21 ++++++------
 .../verify/CompactionTimeRangeVerifier.java     | 19 ++++++-----
 .../compaction/verify/CompactionVerifier.java   | 21 ++++++++++--
 .../verify/PinotAuditCountVerifierTest.java     |  6 ++--
 8 files changed, 88 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
index 386e62a..be71804 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java
@@ -68,7 +68,7 @@ public class MRCompactionTask extends MRTask {
   public void run() {
     List<CompactionVerifier> verifiers = this.suite.getMapReduceVerifiers();
     for (CompactionVerifier verifier : verifiers) {
-      if (!verifier.verify(dataset)) {
+      if (!verifier.verify(dataset).isSuccessful()) {
         log.error("Verification {} for {} is not passed.", verifier.getName(), 
dataset.datasetURN());
         this.onMRTaskComplete (false, new IOException("Compaction verification 
for MR is failed"));
         return;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java
index d6e2983..3fdf6bd 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.compaction.suite.CompactionSuite;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.runtime.TaskContext;
@@ -35,17 +36,19 @@ import org.apache.gobblin.runtime.task.TaskIFace;
 public class CompactionFailedTask extends FailedTask {
   protected final CompactionSuite suite;
   protected final Dataset dataset;
+  protected final String failedReason;
 
   public CompactionFailedTask (TaskContext taskContext) {
     super(taskContext);
     this.suite = CompactionSuiteUtils.getCompactionSuiteFactory 
(taskContext.getTaskState()).
         createSuite(taskContext.getTaskState());
     this.dataset = this.suite.load(taskContext.getTaskState());
+    this.failedReason = 
taskContext.getTaskState().getProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON);
   }
 
   @Override
   public void run() {
-    log.error ("Compaction job for " + dataset.datasetURN() + " is failed. 
Please take a look");
+    log.error ("Compaction job for " + dataset.datasetURN() + " is failed 
because of {}", failedReason);
     this.workingState = WorkUnitState.WorkingState.FAILED;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 4c037af..4e8d3e0 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -25,7 +25,9 @@ import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.config.ConfigBuilder;
@@ -68,7 +70,6 @@ import 
org.apache.gobblin.util.request_allocation.ResourcePool;
 
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -77,6 +78,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -152,6 +154,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
         long timeOutInMinute = 
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES,
 30);
         long iterationCountLimit = 
this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT,
 Integer.MAX_VALUE);
         long iteration = 0;
+        Map<String, String> failedReasonMap = null;
         while (datasets.size() > 0 && iteration++ < iterationCountLimit) {
           Iterator<Callable<VerifiedDataset>> verifierIterator =
                   Iterators.transform (datasets.iterator(), new 
Function<Dataset, Callable<VerifiedDataset>>() {
@@ -165,22 +168,24 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
                   ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of("Verifier-compaction-dataset-pool-%d")));
 
           List<Dataset> failedDatasets = Lists.newArrayList();
+          failedReasonMap = Maps.newHashMap();
 
           List<Either<VerifiedDataset, ExecutionException>> futures = 
executor.executeAndGetResults();
           for (Either<VerifiedDataset, ExecutionException> either: futures) {
             if (either instanceof Either.Right) {
               ExecutionException exc = ((Either.Right<VerifiedDataset, 
ExecutionException>) either).getRight();
               DatasetVerificationException dve = 
(DatasetVerificationException) exc.getCause();
-              log.error ("Verification raised an exception:" + 
ExceptionUtils.getStackTrace(dve.cause));
               failedDatasets.add(dve.dataset);
+              failedReasonMap.put(dve.dataset.getUrn(), 
ExceptionUtils.getFullStackTrace(dve.cause));
             } else {
               VerifiedDataset vd = ((Either.Left<VerifiedDataset, 
ExecutionException>) either).getLeft();
               if (!vd.verifiedResult.allVerificationPassed) {
                 if (vd.verifiedResult.shouldRetry) {
-                  log.error ("Dataset {} verification has failure but should 
retry", vd.dataset.datasetURN());
+                  log.debug ("Dataset {} verification has failure but should 
retry", vd.dataset.datasetURN());
                   failedDatasets.add(vd.dataset);
+                  failedReasonMap.put(vd.dataset.getUrn(), 
vd.verifiedResult.failedReason);
                 } else {
-                  log.error ("Dataset {} verification has failure but no need 
to retry", vd.dataset.datasetURN());
+                  log.debug ("Dataset {} verification has failure but no need 
to retry", vd.dataset.datasetURN());
                 }
               }
             }
@@ -196,7 +201,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
           for (Dataset dataset: datasets) {
             log.info ("{} is timed out and give up the verification, adding a 
failed task", dataset.datasetURN());
             // create failed task for these failed datasets
-            this.workUnitIterator.addWorkUnit 
(createWorkUnitForFailure(dataset));
+            this.workUnitIterator.addWorkUnit 
(createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn())));
           }
         }
 
@@ -272,6 +277,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
   private static class VerifiedResult {
     private boolean allVerificationPassed;
     private boolean shouldRetry;
+    private String failedReason;
   }
 
   @AllArgsConstructor
@@ -299,10 +305,13 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     public VerifiedResult verify (Dataset dataset) throws Exception {
       boolean verificationPassed = true;
       boolean shouldRetry = true;
+      String failedReason = "";
       if (verifiers != null) {
         for (CompactionVerifier verifier : verifiers) {
-          if (!verifier.verify (dataset)) {
+          CompactionVerifier.Result rst = verifier.verify (dataset);
+          if (!rst.isSuccessful()) {
             verificationPassed = false;
+            failedReason = rst.getFailureReason();
             // Not all verification should be retried. Below are verifications 
which
             // doesn't need retry. If any of then failed, we simply skip this 
dataset.
             if (!verifier.isRetriable()) {
@@ -313,7 +322,7 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
         }
       }
 
-      return new VerifiedResult(verificationPassed, shouldRetry);
+      return new VerifiedResult(verificationPassed, shouldRetry, failedReason);
     }
   }
 
@@ -401,6 +410,14 @@ public class CompactionSource implements 
WorkUnitStreamSource<String, String> {
     return workUnit;
   }
 
+  protected WorkUnit createWorkUnitForFailure (Dataset dataset, String reason) 
throws IOException {
+    WorkUnit workUnit = new FailedTask.FailedWorkUnit();
+    workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, 
reason);
+    TaskUtils.setTaskFactoryClass(workUnit, 
CompactionFailedTask.CompactionFailedTaskFactory.class);
+    suite.save (dataset, workUnit);
+    return workUnit;
+  }
+
   @Override
   public Extractor getExtractor (WorkUnitState state) throws IOException {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index 5700e68..ebfe0e6 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.compaction.verify;
 
 import com.google.common.base.Splitter;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -112,10 +114,10 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
    * @param dataset Dataset needs to be verified
    * @return If verification is succeeded
    */
-  public boolean verify (FileSystemDataset dataset) {
+  public Result verify (FileSystemDataset dataset) {
     if (auditCountClient == null) {
       log.debug("No audit count client specified, skipped");
-      return true;
+      return new Result(true, "");
     }
 
     CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(this.state).parse(dataset);
@@ -125,16 +127,16 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
     try {
       Map<String, Long> countsByTier = auditCountClient.fetch (datasetName, 
startTime.getMillis(), endTime.getMillis());
       for (String tier: referenceTiers) {
-        if (passed (datasetName, countsByTier, tier)) {
-          return true;
+        Result rst = passed (datasetName, countsByTier, tier);
+        if (rst.isSuccessful()) {
+          return new Result(true, "");
         }
       }
     } catch (IOException e) {
-      log.error(e.toString());
+      return new Result(false, ExceptionUtils.getFullStackTrace(e));
     }
 
-    log.warn ("Audit count verification failed for {} between {} and {}", 
datasetName, startTime, endTime);
-    return false;
+    return new Result(false, String.format("%s data is not complete between %s 
and %s", datasetName, startTime, endTime));
   }
 
   /**
@@ -144,31 +146,27 @@ public class CompactionAuditCountVerifier implements 
CompactionVerifier<FileSyst
    * @param referenceTier the tiers we wants to compare against
    * @return If any of (gobblin/refenence) >= threshold, return true, else 
return false
    */
-  private boolean passed (String datasetName, Map<String, Long> countsByTier, 
String referenceTier) {
+  private Result passed (String datasetName, Map<String, Long> countsByTier, 
String referenceTier) {
     if (!countsByTier.containsKey(this.gobblinTier)) {
-      log.error (String
-              .format("Failed to get audit count for topic %s, tier %s", 
datasetName, this.gobblinTier));
-      return false;
+      return new Result(false, String.format("Failed to get audit count for 
topic %s, tier %s", datasetName, this.gobblinTier));
     }
     if (!countsByTier.containsKey(referenceTier)) {
-      log.error (String.format("Failed to get audit count for topic %s, tier 
%s", datasetName, referenceTier));
-      return false;
+      return new Result(false, String.format("Failed to get audit count for 
topic %s, tier %s", datasetName, referenceTier));
     }
 
     long refCount = countsByTier.get(referenceTier);
     long gobblinCount = countsByTier.get(this.gobblinTier);
 
     if ((double) gobblinCount / (double) refCount < this.threshold) {
-      log.warn (String.format("Verification failed for %s : gobblin count = 
%d, %s count = %d (%f)",
-              datasetName, gobblinCount, referenceTier, refCount, (double) 
gobblinCount / (double) refCount));
-      return false;
+      return new Result (false, String.format("%s failed for %s : gobblin 
count = %d, %s count = %d (%f < threshold %f)",
+              this.getName(), datasetName, gobblinCount, referenceTier, 
refCount, (double) gobblinCount / (double) refCount, this.threshold));
     }
 
-    return true;
+    return new Result(true, "");
   }
 
   public String getName() {
-    return this.getClass().getName() + "(" + 
this.auditCountClient.getClass().getName() + ")";
+    return this.getClass().getName();
   }
 
   private static class EmptyAuditCountClientFactory implements 
AuditCountClientFactory {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index fbd6413..67bb63a 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.compaction.verify;
 
 
 import com.google.common.collect.Lists;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
@@ -36,7 +38,6 @@ import java.util.Map;
  */
 @Slf4j
 public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSystemDataset> {
-  public final static String COMPACTION_VERIFIER_THRESHOLD = 
"compaction-verifier-threshold";
   private final State state;
 
   /**
@@ -59,7 +60,7 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
    *
    * @return true iff the difference exceeds the threshold or this is the 
first time compaction
    */
-  public boolean verify (FileSystemDataset dataset) {
+  public Result verify (FileSystemDataset dataset) {
 
     Map<String, Double> thresholdMap = RecompactionConditionBasedOnRatio.
             getDatasetRegexAndRecompactThreshold 
(state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET,
@@ -68,32 +69,32 @@ public class CompactionThresholdVerifier implements 
CompactionVerifier<FileSyste
     CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
 
     double threshold = 
RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName 
(result.getDatasetName(), thresholdMap);
-    log.info ("Threshold is {} for dataset {}", threshold, 
result.getDatasetName());
+    log.debug ("Threshold is {} for dataset {}", threshold, 
result.getDatasetName());
 
     InputRecordCountHelper helper = new InputRecordCountHelper(state);
     try {
       double newRecords = helper.calculateRecordCount (Lists.newArrayList(new 
Path(dataset.datasetURN())));
       double oldRecords = helper.readRecordCount (new 
Path(result.getDstAbsoluteDir()));
 
-      log.info ("Dataset {} : previous records {}, current records {}", 
dataset.datasetURN(), oldRecords, newRecords);
       if (oldRecords == 0) {
-        return true;
+        return new Result(true, "");
       }
       if ((newRecords - oldRecords) / oldRecords > threshold) {
-        log.info ("Dataset {} records exceeded the threshold {}", 
dataset.datasetURN(), threshold);
-        return true;
+        log.debug ("Dataset {} records exceeded the threshold {}", 
dataset.datasetURN(), threshold);
+        return new Result(true, "");
       }
+
+      return new Result(false, String.format("%s is failed for dataset %s. 
Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(), 
result.getDatasetName(), oldRecords, newRecords, threshold));
     } catch (IOException e) {
-      log.error(e.toString());
+      return new Result(false, ExceptionUtils.getFullStackTrace(e));
     }
-    return false;
   }
 
   /**
    * Get compaction threshold verifier name
    */
   public String getName() {
-    return COMPACTION_VERIFIER_THRESHOLD;
+    return this.getClass().getName();
   }
 
   public boolean isRetriable () {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index aefff8d..85eca40 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
@@ -42,8 +43,9 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
 
   protected State state;
 
-  public boolean verify (FileSystemDataset dataset) {
-
+  public Result verify (FileSystemDataset dataset) {
+    final DateTime earliest;
+    final DateTime latest;
     try {
       CompactionPathParser.CompactionParserResult result = new 
CompactionPathParser(state).parse(dataset);
       DateTime folderTime = result.getTime();
@@ -55,21 +57,22 @@ public class CompactionTimeRangeVerifier implements 
CompactionVerifier<FileSyste
       // get earliest time
       String maxTimeAgoStr = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO);
       Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr);
-      DateTime earliest = current.minus(maxTimeAgo);
+      earliest = current.minus(maxTimeAgo);
 
       // get latest time
       String minTimeAgoStr = 
this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO,
 TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO);
       Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr);
-      DateTime latest = current.minus(minTimeAgo);
+      latest = current.minus(minTimeAgo);
 
       if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) {
-        log.info("{} falls in the user defined time range", 
dataset.datasetRoot());
-        return true;
+        log.debug("{} falls in the user defined time range", 
dataset.datasetRoot());
+        return new Result(true, "");
       }
     } catch (Exception e) {
-      log.error("{} cannot be verified because of {}", dataset.datasetRoot(), 
e.toString());
+      log.error("{} cannot be verified because of {}", dataset.datasetRoot(), 
ExceptionUtils.getFullStackTrace(e));
+      return new Result(false, e.toString());
     }
-    return false;
+    return new Result(false, dataset.datasetRoot() + " is not in between " + 
earliest + " and " + latest);
   }
 
   public String getName() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
index 09e996a..25573f6 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
@@ -18,15 +18,30 @@ package org.apache.gobblin.compaction.verify;
 
 import org.apache.gobblin.dataset.Dataset;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
 /**
  * An interface which represents a generic verifier for compaction
  */
 public interface CompactionVerifier<D extends Dataset> {
+
+   @Getter
+   @AllArgsConstructor
+   class Result {
+      boolean isSuccessful;
+      String failureReason;
+   }
+
    String COMPACTION_VERIFIER_PREFIX = "compaction-verifier-";
    String COMPACTION_VERIFICATION_TIMEOUT_MINUTES = 
"compaction.verification.timeoutMinutes";
    String COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT = 
"compaction.verification.iteration.countLimit";
-   String COMPACTION_VERIFICATION_THREADS= "compaction.verification.threads";
-   boolean verify(D dataset);
+   String COMPACTION_VERIFICATION_THREADS = "compaction.verification.threads";
+   String COMPACTION_VERIFICATION_FAIL_REASON = 
"compaction.verification.fail.reason";
+
+   Result verify (D dataset);
+
    boolean isRetriable ();
-   String getName();
+   String getName ();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index 6bda9f4..f876811 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -81,7 +81,7 @@ public class PinotAuditCountVerifierTest {
            GOBBLIN_TIER,  1000L
     ));
 
-    Assert.assertTrue (verifier.verify(dataset));
+    Assert.assertTrue (verifier.verify(dataset).isSuccessful);
 
     // test true because GOBBLIN_TIER / PRODUCER_TIER is above threshold
     client.setCounts(ImmutableMap.of(
@@ -89,7 +89,7 @@ public class PinotAuditCountVerifierTest {
             ORIGIN_TIER,   1100L,
             GOBBLIN_TIER,  1000L
     ));
-    Assert.assertTrue (verifier.verify(dataset));
+    Assert.assertTrue (verifier.verify(dataset).isSuccessful);
 
 
     // test false because GOBBLIN_TIER / (PRODUCER_TIER || ORIGIN_TIER) is 
below threshold
@@ -98,7 +98,7 @@ public class PinotAuditCountVerifierTest {
             ORIGIN_TIER,   1100L,
             GOBBLIN_TIER,  1000L
     ));
-    Assert.assertFalse (verifier.verify(dataset));
+    Assert.assertFalse (verifier.verify(dataset).isSuccessful);
   }
 
 

Reply via email to