Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 54bda2736 -> 5d0e944c3


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
index 6ade09e..182244a 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.collect.Lists;
-import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,14 +36,15 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 import org.apache.gobblin.compaction.dataset.Dataset;
+import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.FileListUtils;
 
-
 /**
  * This class creates the following properties for a single MapReduce job for 
compaction:
  * compaction.topic, compaction.job.input.dir, compaction.job.dest.dir, 
compaction.job.dest.dir.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index 8a0599e..0ab3eab 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.math3.primes.Primes;
@@ -42,7 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.joda.time.DateTime;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
index 5fcff75..7021a9b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java
@@ -23,7 +23,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapreduce.AvroJob;
 import org.apache.avro.mapreduce.AvroKeyRecordReader;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,10 +30,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
-import org.apache.gobblin.util.AvroUtils;
-
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
+import org.apache.gobblin.util.AvroUtils;
+
 
 /**
  * A subclass of {@link org.apache.avro.mapreduce.AvroKeyRecordReader}. The 
purpose is to add a constructor

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
index 93d4ed6..5f864cb 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java
@@ -17,21 +17,12 @@
 
 package org.apache.gobblin.compaction.mapreduce.avro;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.AvroKey;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -47,9 +38,6 @@ import org.apache.hadoop.util.VersionInfo;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import org.apache.gobblin.util.AvroUtils;
-import org.apache.gobblin.util.FileListUtils;
-
 
 /**
  * A subclass of {@link 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro 
inputfiles.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
index 8e508e5..5f2e9fd 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java
@@ -27,16 +27,15 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.node.ObjectNode;
-
 import lombok.extern.slf4j.Slf4j;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
index 2fc6c58..f5cac79 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java
@@ -17,22 +17,22 @@
 
 package org.apache.gobblin.compaction.parser;
 
-import com.google.common.base.Joiner;
-import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
 import org.apache.commons.lang.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.FileSystemDataset;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index f11378f..4e4382b 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -16,9 +16,28 @@
  */
 
 package org.apache.gobblin.compaction.source;
+
+import java.io.IOException;
+import java.net.URI;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTimeUtils;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
@@ -27,19 +46,23 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
-import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
-import org.apache.gobblin.config.ConfigBuilder;
-import org.apache.gobblin.data.management.dataset.DatasetUtils;
-import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
 import org.apache.gobblin.compaction.suite.CompactionSuite;
+import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.dataset.DatasetUtils;
+import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
+import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.DatasetsFinder;
 import org.apache.gobblin.runtime.JobState;
@@ -63,31 +86,9 @@ import 
org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
 import org.apache.gobblin.util.request_allocation.RequestAllocator;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
 import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
-import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
 import org.apache.gobblin.util.request_allocation.ResourceEstimator;
 import org.apache.gobblin.util.request_allocation.ResourcePool;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.joda.time.DateTimeUtils;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * A compaction source derived from {@link Source} which uses {@link 
DefaultFileSystemGlobFinder} to find all
  * {@link Dataset}s. Use {@link CompactionSuite#getDatasetsFinderVerifiers()} 
to guarantee a given dataset has passed

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
index 0142f6b..7b62671 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java
@@ -17,10 +17,20 @@
 
 package org.apache.gobblin.compaction.suite;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import 
org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction;
-import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
 import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction;
+import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction;
 import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator;
 import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
 import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier;
@@ -28,14 +38,6 @@ import 
org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier;
 import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.ArrayList;
 
 /**
  * A type of {@link CompactionSuite} which implements all components needed 
for avro file compaction.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
index 3c36ba5..1c564a6 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java
@@ -17,19 +17,19 @@
 
 package org.apache.gobblin.compaction.suite;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Job;
+
 import org.apache.gobblin.compaction.action.CompactionCompleteAction;
 import org.apache.gobblin.compaction.mapreduce.MRCompactionTask;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
 import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
 import 
org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder;
 import org.apache.gobblin.dataset.Dataset;
 
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.configuration.State;
-import org.apache.hadoop.mapreduce.Job;
-
-import java.io.IOException;
-import java.util.List;
-
 /**
  * This interface provides major components required by {@link 
org.apache.gobblin.compaction.source.CompactionSource}
  * and {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask} flow.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index ebfe0e6..5653281 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -17,9 +17,18 @@
 
 package org.apache.gobblin.compaction.verify;
 
-import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Splitter;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -27,12 +36,6 @@ import 
org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
 import org.apache.gobblin.util.ClassAliasResolver;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
 
 /**
  * Use {@link AuditCountClient} to retrieve all record count across different 
tiers

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
index 67bb63a..a2751b9 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
@@ -17,21 +17,22 @@
 
 package org.apache.gobblin.compaction.verify;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.Lists;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
+import lombok.extern.slf4j.Slf4j;
+
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Map;
 
 /**
  * Compare the source and destination avro records. Determine if a compaction 
is needed.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
index a267ab5..06abd0a 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java
@@ -18,19 +18,21 @@
 package org.apache.gobblin.compaction.verify;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Period;
+import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.compaction.parser.CompactionPathParser;
 import org.apache.gobblin.compaction.source.CompactionSource;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatter;
-import org.joda.time.format.PeriodFormatterBuilder;
 
 /**
  * A simple class which verify current dataset belongs to a specific time 
range. Will skip to do

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
index 25573f6..68f57a7 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java
@@ -16,11 +16,11 @@
  */
 package org.apache.gobblin.compaction.verify;
 
-import org.apache.gobblin.dataset.Dataset;
-
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 
+import org.apache.gobblin.dataset.Dataset;
+
 
 /**
  * An interface which represents a generic verifier for compaction

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
index de95255..e1bc952 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java
@@ -16,8 +16,24 @@
  */
 package org.apache.gobblin.compaction.verify;
 
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.compaction.dataset.DatasetHelper;
 import org.apache.gobblin.compaction.event.CompactionSlaEventHelper;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
@@ -26,19 +42,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.RecordCountProvider;
 import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.util.Collection;
 
 /**
  * A class helps to calculate, serialize, deserialize record count.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
index 3fed3e5..51fe866 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java
@@ -17,20 +17,10 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.io.Files;
-import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
-import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
-import org.apache.gobblin.compaction.source.CompactionSource;
-import 
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
-import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
-import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
-import org.apache.gobblin.compaction.verify.CompactionVerifier;
-import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.data.management.copy.CopyConfiguration;
-import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
-import org.apache.gobblin.runtime.api.JobExecutionResult;
-import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
@@ -43,13 +33,23 @@ import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URI;
+import com.google.common.io.Files;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.compaction.audit.AuditCountClientFactory;
+import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
+import org.apache.gobblin.compaction.source.CompactionSource;
+import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories;
+import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier;
+import org.apache.gobblin.compaction.verify.CompactionVerifier;
+import org.apache.gobblin.compaction.verify.InputRecordCountHelper;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import 
org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer;
+import 
org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+
 
 @Slf4j
 public class MRCompactionTaskTest {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
index 517b609..b677cd3 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 package org.apache.gobblin.compaction.mapreduce;
-import org.apache.gobblin.compaction.dataset.Dataset;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import org.testng.Assert;
 
+import org.apache.gobblin.compaction.dataset.Dataset;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
index 150eee0..b243a8e 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce.avro;
 
 import java.util.List;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.testng.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
index 8211710..3d51218 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java
@@ -20,26 +20,25 @@ package org.apache.gobblin.compaction.mapreduce.conditions;
 import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
-import org.joda.time.format.PeriodFormatterBuilder;
 import org.joda.time.format.PeriodFormatter;
+import org.joda.time.format.PeriodFormatterBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
-
-
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
-import org.apache.gobblin.compaction.conditions.RecompactionCondition;
+import com.google.common.collect.Lists;
+
 import org.apache.gobblin.compaction.conditions.RecompactionCombineCondition;
+import org.apache.gobblin.compaction.conditions.RecompactionCondition;
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnDuration;
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnFileCount;
 import 
org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
index f876811..f9c855a 100644
--- 
a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
+++ 
b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java
@@ -16,19 +16,23 @@
  */
 package org.apache.gobblin.compaction.verify;
 
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+
 import org.apache.gobblin.compaction.audit.AuditCountClient;
 import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder;
 import org.apache.gobblin.compaction.mapreduce.MRCompactor;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import java.util.Map;
 
 /**
  * Class to test audit count verification logic

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
index df27f9f..3491013 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java
@@ -23,12 +23,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
@@ -36,6 +30,13 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobPattern;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.DatasetsFinder;
@@ -62,9 +63,11 @@ public abstract class ConfigurableGlobDatasetFinder<T 
extends Dataset> implement
 
   public static final String DATASET_FINDER_PATTERN_KEY = 
CONFIGURATION_KEY_PREFIX + "dataset.pattern";
   public static final String DATASET_FINDER_BLACKLIST_KEY = 
CONFIGURATION_KEY_PREFIX + "dataset.blacklist";
+  public static final String DATASET_FINDER_GLOB_BLACKLIST_KEY = 
CONFIGURATION_KEY_PREFIX + "dataset.glob.blacklist";
 
   protected final Path datasetPattern;
   private final Optional<Pattern> blacklist;
+  private final Optional<Pattern> globPatternBlacklist;
   private final Path commonRoot;
   protected final FileSystem fs;
   protected final Properties props;
@@ -86,6 +89,12 @@ public abstract class ConfigurableGlobDatasetFinder<T 
extends Dataset> implement
       this.blacklist = Optional.absent();
     }
 
+    if (ConfigUtils.hasNonEmptyPath(config, 
DATASET_FINDER_GLOB_BLACKLIST_KEY)) {
+      this.globPatternBlacklist = 
Optional.of(GlobPattern.compile(config.getString(DATASET_FINDER_GLOB_BLACKLIST_KEY)));
+    } else {
+      this.globPatternBlacklist = Optional.absent();
+    }
+
     this.fs = fs;
 
     Path tmpDatasetPattern;
@@ -132,6 +141,9 @@ public abstract class ConfigurableGlobDatasetFinder<T 
extends Dataset> implement
         if (this.blacklist.isPresent() && 
this.blacklist.get().matcher(pathToMatch.toString()).find()) {
           continue;
         }
+        if (this.globPatternBlacklist.isPresent() && 
this.globPatternBlacklist.get().matcher(pathToMatch.toString()).find()) {
+          continue;
+        }
         LOG.info("Found dataset at " + fileStatus.getPath());
         
datasets.add(datasetAtPath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath())));
       }

Reply via email to