Repository: incubator-gobblin Updated Branches: refs/heads/master b89706257 -> 707c1e4e5
[GOBBLIN-256] Improve logging for gobblin compaction Closes #2108 from yukuai518/loggin Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/707c1e4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/707c1e4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/707c1e4e Branch: refs/heads/master Commit: 707c1e4e522aebef91cc2a8aa8d12787f2738e50 Parents: b897062 Author: Kuai Yu <[email protected]> Authored: Tue Sep 19 17:04:38 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Sep 19 17:04:38 2017 -0700 ---------------------------------------------------------------------- .../compaction/mapreduce/MRCompactionTask.java | 2 +- .../compaction/source/CompactionFailedTask.java | 5 ++- .../compaction/source/CompactionSource.java | 31 ++++++++++++++---- .../verify/CompactionAuditCountVerifier.java | 34 +++++++++----------- .../verify/CompactionThresholdVerifier.java | 21 ++++++------ .../verify/CompactionTimeRangeVerifier.java | 19 ++++++----- .../compaction/verify/CompactionVerifier.java | 21 ++++++++++-- .../verify/PinotAuditCountVerifierTest.java | 6 ++-- 8 files changed, 88 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java index 386e62a..be71804 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java @@ -68,7 +68,7 @@ public class MRCompactionTask extends MRTask { public void run() { List<CompactionVerifier> verifiers = this.suite.getMapReduceVerifiers(); for (CompactionVerifier verifier : verifiers) { - if (!verifier.verify(dataset)) { + if (!verifier.verify(dataset).isSuccessful()) { log.error("Verification {} for {} is not passed.", verifier.getName(), dataset.datasetURN()); this.onMRTaskComplete (false, new IOException("Compaction verification for MR is failed")); return; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java index d6e2983..3fdf6bd 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionFailedTask.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.compaction.suite.CompactionSuite; import org.apache.gobblin.compaction.suite.CompactionSuiteUtils; +import org.apache.gobblin.compaction.verify.CompactionVerifier; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.runtime.TaskContext; @@ -35,17 +36,19 @@ import org.apache.gobblin.runtime.task.TaskIFace; public class CompactionFailedTask extends FailedTask { protected final CompactionSuite suite; protected final Dataset dataset; + protected final String failedReason; public CompactionFailedTask (TaskContext taskContext) { super(taskContext); this.suite = CompactionSuiteUtils.getCompactionSuiteFactory (taskContext.getTaskState()). createSuite(taskContext.getTaskState()); this.dataset = this.suite.load(taskContext.getTaskState()); + this.failedReason = taskContext.getTaskState().getProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON); } @Override public void run() { - log.error ("Compaction job for " + dataset.datasetURN() + " is failed. Please take a look"); + log.error ("Compaction job for " + dataset.datasetURN() + " is failed because of {}", failedReason); this.workingState = WorkUnitState.WorkingState.FAILED; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java index 4c037af..4e8d3e0 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java @@ -25,7 +25,9 @@ import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.suite.CompactionSuiteUtils; import org.apache.gobblin.config.ConfigBuilder; @@ -68,7 +70,6 @@ import org.apache.gobblin.util.request_allocation.ResourcePool; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -77,6 +78,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.net.URI; +import java.util.Map; import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.Callable; @@ -152,6 +154,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { long timeOutInMinute = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES, 30); long iterationCountLimit = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT, Integer.MAX_VALUE); long iteration = 0; + Map<String, String> failedReasonMap = null; while (datasets.size() > 0 && iteration++ < iterationCountLimit) { Iterator<Callable<VerifiedDataset>> verifierIterator = Iterators.transform (datasets.iterator(), new Function<Dataset, Callable<VerifiedDataset>>() { @@ -165,22 +168,24 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("Verifier-compaction-dataset-pool-%d"))); List<Dataset> failedDatasets = Lists.newArrayList(); + failedReasonMap = Maps.newHashMap(); List<Either<VerifiedDataset, ExecutionException>> futures = executor.executeAndGetResults(); for (Either<VerifiedDataset, ExecutionException> either: futures) { if (either instanceof Either.Right) { ExecutionException exc = ((Either.Right<VerifiedDataset, ExecutionException>) either).getRight(); DatasetVerificationException dve = (DatasetVerificationException) exc.getCause(); - log.error ("Verification raised an exception:" + ExceptionUtils.getStackTrace(dve.cause)); failedDatasets.add(dve.dataset); + failedReasonMap.put(dve.dataset.getUrn(), ExceptionUtils.getFullStackTrace(dve.cause)); } else { VerifiedDataset vd = ((Either.Left<VerifiedDataset, ExecutionException>) either).getLeft(); if (!vd.verifiedResult.allVerificationPassed) { if (vd.verifiedResult.shouldRetry) { - log.error ("Dataset {} verification has failure but should retry", vd.dataset.datasetURN()); + log.debug ("Dataset {} verification has failure but should retry", vd.dataset.datasetURN()); failedDatasets.add(vd.dataset); + failedReasonMap.put(vd.dataset.getUrn(), vd.verifiedResult.failedReason); } else { - log.error ("Dataset {} verification has failure but no need to retry", vd.dataset.datasetURN()); + log.debug ("Dataset {} verification has failure but no need to retry", vd.dataset.datasetURN()); } } } @@ -196,7 +201,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { for (Dataset dataset: datasets) { log.info ("{} is timed out and give up the verification, adding a failed task", dataset.datasetURN()); // create failed task for these failed datasets - this.workUnitIterator.addWorkUnit (createWorkUnitForFailure(dataset)); + this.workUnitIterator.addWorkUnit (createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn()))); } } @@ -272,6 +277,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { private static class VerifiedResult { private boolean allVerificationPassed; private boolean shouldRetry; + private String failedReason; } @AllArgsConstructor @@ -299,10 +305,13 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { public VerifiedResult verify (Dataset dataset) throws Exception { boolean verificationPassed = true; boolean shouldRetry = true; + String failedReason = ""; if (verifiers != null) { for (CompactionVerifier verifier : verifiers) { - if (!verifier.verify (dataset)) { + CompactionVerifier.Result rst = verifier.verify (dataset); + if (!rst.isSuccessful()) { verificationPassed = false; + failedReason = rst.getFailureReason(); // Not all verification should be retried. Below are verifications which // doesn't need retry. If any of then failed, we simply skip this dataset. if (!verifier.isRetriable()) { @@ -313,7 +322,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { } } - return new VerifiedResult(verificationPassed, shouldRetry); + return new VerifiedResult(verificationPassed, shouldRetry, failedReason); } } @@ -401,6 +410,14 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> { return workUnit; } + protected WorkUnit createWorkUnitForFailure (Dataset dataset, String reason) throws IOException { + WorkUnit workUnit = new FailedTask.FailedWorkUnit(); + workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, reason); + TaskUtils.setTaskFactoryClass(workUnit, CompactionFailedTask.CompactionFailedTaskFactory.class); + suite.save (dataset, workUnit); + return workUnit; + } + @Override public Extractor getExtractor (WorkUnitState state) throws IOException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java index 5700e68..ebfe0e6 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java @@ -18,6 +18,8 @@ package org.apache.gobblin.compaction.verify; import com.google.common.base.Splitter; + +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.audit.AuditCountClient; import org.apache.gobblin.compaction.audit.AuditCountClientFactory; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -112,10 +114,10 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst * @param dataset Dataset needs to be verified * @return If verification is succeeded */ - public boolean verify (FileSystemDataset dataset) { + public Result verify (FileSystemDataset dataset) { if (auditCountClient == null) { log.debug("No audit count client specified, skipped"); - return true; + return new Result(true, ""); } CompactionPathParser.CompactionParserResult result = new CompactionPathParser(this.state).parse(dataset); @@ -125,16 +127,16 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst try { Map<String, Long> countsByTier = auditCountClient.fetch (datasetName, startTime.getMillis(), endTime.getMillis()); for (String tier: referenceTiers) { - if (passed (datasetName, countsByTier, tier)) { - return true; + Result rst = passed (datasetName, countsByTier, tier); + if (rst.isSuccessful()) { + return new Result(true, ""); } } } catch (IOException e) { - log.error(e.toString()); + return new Result(false, ExceptionUtils.getFullStackTrace(e)); } - log.warn ("Audit count verification failed for {} between {} and {}", datasetName, startTime, endTime); - return false; + return new Result(false, String.format("%s data is not complete between %s and %s", datasetName, startTime, endTime)); } /** @@ -144,31 +146,27 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst * @param referenceTier the tiers we wants to compare against * @return If any of (gobblin/refenence) >= threshold, return true, else return false */ - private boolean passed (String datasetName, Map<String, Long> countsByTier, String referenceTier) { + private Result passed (String datasetName, Map<String, Long> countsByTier, String referenceTier) { if (!countsByTier.containsKey(this.gobblinTier)) { - log.error (String - .format("Failed to get audit count for topic %s, tier %s", datasetName, this.gobblinTier)); - return false; + return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, this.gobblinTier)); } if (!countsByTier.containsKey(referenceTier)) { - log.error (String.format("Failed to get audit count for topic %s, tier %s", datasetName, referenceTier)); - return false; + return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, referenceTier)); } long refCount = countsByTier.get(referenceTier); long gobblinCount = countsByTier.get(this.gobblinTier); if ((double) gobblinCount / (double) refCount < this.threshold) { - log.warn (String.format("Verification failed for %s : gobblin count = %d, %s count = %d (%f)", - datasetName, gobblinCount, referenceTier, refCount, (double) gobblinCount / (double) refCount)); - return false; + return new Result (false, String.format("%s failed for %s : gobblin count = %d, %s count = %d (%f < threshold %f)", + this.getName(), datasetName, gobblinCount, referenceTier, refCount, (double) gobblinCount / (double) refCount, this.threshold)); } - return true; + return new Result(true, ""); } public String getName() { - return this.getClass().getName() + "(" + this.auditCountClient.getClass().getName() + ")"; + return this.getClass().getName(); } private static class EmptyAuditCountClientFactory implements AuditCountClientFactory { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java index fbd6413..67bb63a 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java @@ -19,6 +19,8 @@ package org.apache.gobblin.compaction.verify; import com.google.common.collect.Lists; + +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; @@ -36,7 +38,6 @@ import java.util.Map; */ @Slf4j public class CompactionThresholdVerifier implements CompactionVerifier<FileSystemDataset> { - public final static String COMPACTION_VERIFIER_THRESHOLD = "compaction-verifier-threshold"; private final State state; /** @@ -59,7 +60,7 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste * * @return true iff the difference exceeds the threshold or this is the first time compaction */ - public boolean verify (FileSystemDataset dataset) { + public Result verify (FileSystemDataset dataset) { Map<String, Double> thresholdMap = RecompactionConditionBasedOnRatio. getDatasetRegexAndRecompactThreshold (state.getProp(MRCompactor.COMPACTION_LATEDATA_THRESHOLD_FOR_RECOMPACT_PER_DATASET, @@ -68,32 +69,32 @@ public class CompactionThresholdVerifier implements CompactionVerifier<FileSyste CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); double threshold = RecompactionConditionBasedOnRatio.getRatioThresholdByDatasetName (result.getDatasetName(), thresholdMap); - log.info ("Threshold is {} for dataset {}", threshold, result.getDatasetName()); + log.debug ("Threshold is {} for dataset {}", threshold, result.getDatasetName()); InputRecordCountHelper helper = new InputRecordCountHelper(state); try { double newRecords = helper.calculateRecordCount (Lists.newArrayList(new Path(dataset.datasetURN()))); double oldRecords = helper.readRecordCount (new Path(result.getDstAbsoluteDir())); - log.info ("Dataset {} : previous records {}, current records {}", dataset.datasetURN(), oldRecords, newRecords); if (oldRecords == 0) { - return true; + return new Result(true, ""); } if ((newRecords - oldRecords) / oldRecords > threshold) { - log.info ("Dataset {} records exceeded the threshold {}", dataset.datasetURN(), threshold); - return true; + log.debug ("Dataset {} records exceeded the threshold {}", dataset.datasetURN(), threshold); + return new Result(true, ""); } + + return new Result(false, String.format("%s is failed for dataset %s. Prev=%f, Cur=%f, not reaching to threshold %f", this.getName(), result.getDatasetName(), oldRecords, newRecords, threshold)); } catch (IOException e) { - log.error(e.toString()); + return new Result(false, ExceptionUtils.getFullStackTrace(e)); } - return false; } /** * Get compaction threshold verifier name */ public String getName() { - return COMPACTION_VERIFIER_THRESHOLD; + return this.getClass().getName(); } public boolean isRetriable () { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java index aefff8d..85eca40 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java @@ -17,6 +17,7 @@ package org.apache.gobblin.compaction.verify; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; @@ -42,8 +43,9 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste protected State state; - public boolean verify (FileSystemDataset dataset) { - + public Result verify (FileSystemDataset dataset) { + final DateTime earliest; + final DateTime latest; try { CompactionPathParser.CompactionParserResult result = new CompactionPathParser(state).parse(dataset); DateTime folderTime = result.getTime(); @@ -55,21 +57,22 @@ public class CompactionTimeRangeVerifier implements CompactionVerifier<FileSyste // get earliest time String maxTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MAX_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MAX_TIME_AGO); Period maxTimeAgo = formatter.parsePeriod(maxTimeAgoStr); - DateTime earliest = current.minus(maxTimeAgo); + earliest = current.minus(maxTimeAgo); // get latest time String minTimeAgoStr = this.state.getProp(TimeBasedSubDirDatasetsFinder.COMPACTION_TIMEBASED_MIN_TIME_AGO, TimeBasedSubDirDatasetsFinder.DEFAULT_COMPACTION_TIMEBASED_MIN_TIME_AGO); Period minTimeAgo = formatter.parsePeriod(minTimeAgoStr); - DateTime latest = current.minus(minTimeAgo); + latest = current.minus(minTimeAgo); if (earliest.isBefore(folderTime) && latest.isAfter(folderTime)) { - log.info("{} falls in the user defined time range", dataset.datasetRoot()); - return true; + log.debug("{} falls in the user defined time range", dataset.datasetRoot()); + return new Result(true, ""); } } catch (Exception e) { - log.error("{} cannot be verified because of {}", dataset.datasetRoot(), e.toString()); + log.error("{} cannot be verified because of {}", dataset.datasetRoot(), ExceptionUtils.getFullStackTrace(e)); + return new Result(false, e.toString()); } - return false; + return new Result(false, dataset.datasetRoot() + " is not in between " + earliest + " and " + latest); } public String getName() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java index 09e996a..25573f6 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java @@ -18,15 +18,30 @@ package org.apache.gobblin.compaction.verify; import org.apache.gobblin.dataset.Dataset; +import lombok.AllArgsConstructor; +import lombok.Getter; + + /** * An interface which represents a generic verifier for compaction */ public interface CompactionVerifier<D extends Dataset> { + + @Getter + @AllArgsConstructor + class Result { + boolean isSuccessful; + String failureReason; + } + String COMPACTION_VERIFIER_PREFIX = "compaction-verifier-"; String COMPACTION_VERIFICATION_TIMEOUT_MINUTES = "compaction.verification.timeoutMinutes"; String COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT = "compaction.verification.iteration.countLimit"; - String COMPACTION_VERIFICATION_THREADS= "compaction.verification.threads"; - boolean verify(D dataset); + String COMPACTION_VERIFICATION_THREADS = "compaction.verification.threads"; + String COMPACTION_VERIFICATION_FAIL_REASON = "compaction.verification.fail.reason"; + + Result verify (D dataset); + boolean isRetriable (); - String getName(); + String getName (); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/707c1e4e/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java index 6bda9f4..f876811 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java @@ -81,7 +81,7 @@ public class PinotAuditCountVerifierTest { GOBBLIN_TIER, 1000L )); - Assert.assertTrue (verifier.verify(dataset)); + Assert.assertTrue (verifier.verify(dataset).isSuccessful); // test true because GOBBLIN_TIER / PRODUCER_TIER is above threshold client.setCounts(ImmutableMap.of( @@ -89,7 +89,7 @@ public class PinotAuditCountVerifierTest { ORIGIN_TIER, 1100L, GOBBLIN_TIER, 1000L )); - Assert.assertTrue (verifier.verify(dataset)); + Assert.assertTrue (verifier.verify(dataset).isSuccessful); // test false because GOBBLIN_TIER / (PRODUCER_TIER || ORIGIN_TIER) is below threshold @@ -98,7 +98,7 @@ public class PinotAuditCountVerifierTest { ORIGIN_TIER, 1100L, GOBBLIN_TIER, 1000L )); - Assert.assertFalse (verifier.verify(dataset)); + Assert.assertFalse (verifier.verify(dataset).isSuccessful); }
