Repository: incubator-gobblin Updated Branches: refs/heads/master 54bda2736 -> 5d0e944c3
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java index 6ade09e..182244a 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobPropCreator.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import com.google.common.collect.Lists; -import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,14 +36,15 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.FileListUtils; - /** * This class creates the following properties for a single MapReduce job for compaction: * compaction.topic, compaction.job.input.dir, compaction.job.dest.dir, compaction.job.dest.dir. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java index 8a0599e..0ab3eab 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java @@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -29,7 +28,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.commons.io.FilenameUtils; import org.apache.commons.math3.primes.Primes; @@ -42,7 +40,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.joda.time.DateTime; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java index 5fcff75..7021a9b 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyCombineFileRecordReader.java @@ -23,7 +23,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroKeyRecordReader; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.InputSplit; @@ -31,10 +30,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.gobblin.util.AvroUtils; - import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.gobblin.util.AvroUtils; + /** * A subclass of {@link org.apache.avro.mapreduce.AvroKeyRecordReader}. The purpose is to add a constructor http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java index 93d4ed6..5f864cb 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/AvroKeyRecursiveCombineFileInputFormat.java @@ -17,21 +17,12 @@ package org.apache.gobblin.compaction.mapreduce.avro; -import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapred.AvroKey; -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -47,9 +38,6 @@ import org.apache.hadoop.util.VersionInfo; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.gobblin.util.AvroUtils; -import org.apache.gobblin.util.FileListUtils; - /** * A subclass of {@link org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat} for Avro inputfiles. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java index 8e508e5..5f2e9fd 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/avro/FieldAttributeBasedDeltaFieldsProvider.java @@ -27,16 +27,15 @@ import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.node.ObjectNode; - import lombok.extern.slf4j.Slf4j; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java index 2fc6c58..f5cac79 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/parser/CompactionPathParser.java @@ -17,22 +17,22 @@ package org.apache.gobblin.compaction.parser; -import com.google.common.base.Joiner; -import org.apache.gobblin.dataset.FileSystemDataset; -import lombok.AllArgsConstructor; import org.apache.commons.lang.StringUtils; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.FileSystemDataset; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java index f11378f..4e4382b 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java @@ -16,9 +16,28 @@ */ package org.apache.gobblin.compaction.source; + +import java.io.IOException; +import java.net.URI; import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.joda.time.DateTimeUtils; import com.google.common.base.Function; import com.google.common.base.Optional; @@ -27,19 +46,23 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.lang.exception.ExceptionUtils; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory; import org.apache.gobblin.compaction.mapreduce.MRCompactor; -import org.apache.gobblin.compaction.suite.CompactionSuiteUtils; -import org.apache.gobblin.config.ConfigBuilder; -import org.apache.gobblin.data.management.dataset.DatasetUtils; -import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder; import org.apache.gobblin.compaction.suite.CompactionSuite; +import org.apache.gobblin.compaction.suite.CompactionSuiteUtils; import org.apache.gobblin.compaction.verify.CompactionVerifier; -import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory; +import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.dataset.DatasetUtils; +import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder; +import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest; +import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor; import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.dataset.DatasetsFinder; import org.apache.gobblin.runtime.JobState; @@ -63,31 +86,9 @@ import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer; import org.apache.gobblin.util.request_allocation.RequestAllocator; import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig; import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils; -import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest; -import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor; import org.apache.gobblin.util.request_allocation.ResourceEstimator; import org.apache.gobblin.util.request_allocation.ResourcePool; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.joda.time.DateTimeUtils; - -import java.io.IOException; -import java.net.URI; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - /** * A compaction source derived from {@link Source} which uses {@link DefaultFileSystemGlobFinder} to find all * {@link Dataset}s. Use {@link CompactionSuite#getDatasetsFinderVerifiers()} to guarantee a given dataset has passed http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java index 0142f6b..7b62671 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionAvroSuite.java @@ -17,10 +17,20 @@ package org.apache.gobblin.compaction.suite; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.action.CompactionCompleteAction; import org.apache.gobblin.compaction.action.CompactionCompleteFileOperationAction; -import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction; import org.apache.gobblin.compaction.action.CompactionHiveRegistrationAction; +import org.apache.gobblin.compaction.action.CompactionMarkDirectoryAction; import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator; import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier; import org.apache.gobblin.compaction.verify.CompactionThresholdVerifier; @@ -28,14 +38,6 @@ import org.apache.gobblin.compaction.verify.CompactionTimeRangeVerifier; import org.apache.gobblin.compaction.verify.CompactionVerifier; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; - -import java.io.IOException; -import java.util.List; -import java.util.LinkedList; -import java.util.ArrayList; /** * A type of {@link CompactionSuite} which implements all components needed for avro file compaction. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java index 3c36ba5..1c564a6 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/suite/CompactionSuite.java @@ -17,19 +17,19 @@ package org.apache.gobblin.compaction.suite; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapreduce.Job; + import org.apache.gobblin.compaction.action.CompactionCompleteAction; import org.apache.gobblin.compaction.mapreduce.MRCompactionTask; +import org.apache.gobblin.compaction.verify.CompactionVerifier; import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder; import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.compaction.verify.CompactionVerifier; -import org.apache.gobblin.configuration.State; -import org.apache.hadoop.mapreduce.Job; - -import java.io.IOException; -import java.util.List; - /** * This interface provides major components required by {@link org.apache.gobblin.compaction.source.CompactionSource} * and {@link org.apache.gobblin.compaction.mapreduce.MRCompactionTask} flow. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java index ebfe0e6..5653281 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java @@ -17,9 +17,18 @@ package org.apache.gobblin.compaction.verify; -import com.google.common.base.Splitter; +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; import org.apache.commons.lang.exception.ExceptionUtils; +import org.joda.time.DateTime; + +import com.google.common.base.Splitter; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.audit.AuditCountClient; import org.apache.gobblin.compaction.audit.AuditCountClientFactory; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -27,12 +36,6 @@ import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.util.ClassAliasResolver; -import lombok.extern.slf4j.Slf4j; -import org.joda.time.DateTime; -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; /** * Use {@link AuditCountClient} to retrieve all record count across different tiers http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java index 67bb63a..a2751b9 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java @@ -17,21 +17,22 @@ package org.apache.gobblin.compaction.verify; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.Path; import com.google.common.collect.Lists; -import org.apache.commons.lang.exception.ExceptionUtils; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Map; /** * Compare the source and destination avro records. Determine if a compaction is needed. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java index a267ab5..06abd0a 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionTimeRangeVerifier.java @@ -18,19 +18,21 @@ package org.apache.gobblin.compaction.verify; import org.apache.commons.lang.exception.ExceptionUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Period; +import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.compaction.source.CompactionSource; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.Period; -import org.joda.time.format.PeriodFormatter; -import org.joda.time.format.PeriodFormatterBuilder; /** * A simple class which verify current dataset belongs to a specific time range. Will skip to do http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java index 25573f6..68f57a7 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionVerifier.java @@ -16,11 +16,11 @@ */ package org.apache.gobblin.compaction.verify; -import org.apache.gobblin.dataset.Dataset; - import lombok.AllArgsConstructor; import lombok.Getter; +import org.apache.gobblin.dataset.Dataset; + /** * An interface which represents a generic verifier for compaction http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java index de95255..e1bc952 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/InputRecordCountHelper.java @@ -16,8 +16,24 @@ */ package org.apache.gobblin.compaction.verify; +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import com.google.common.base.Charsets; import com.google.common.collect.Lists; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -26,19 +42,6 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.RecordCountProvider; import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.BufferedReader; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URI; -import java.util.Collection; /** * A class helps to calculate, serialize, deserialize record count. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java index 3fed3e5..51fe866 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskTest.java @@ -17,20 +17,10 @@ package org.apache.gobblin.compaction.mapreduce; -import com.google.common.io.Files; -import org.apache.gobblin.compaction.audit.AuditCountClientFactory; -import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; -import org.apache.gobblin.compaction.source.CompactionSource; -import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer; -import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories; -import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier; -import org.apache.gobblin.compaction.verify.CompactionVerifier; -import org.apache.gobblin.compaction.verify.InputRecordCountHelper; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.data.management.copy.CopyConfiguration; -import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; -import org.apache.gobblin.runtime.api.JobExecutionResult; -import org.apache.gobblin.runtime.embedded.EmbeddedGobblin; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; @@ -43,13 +33,23 @@ import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URI; +import com.google.common.io.Files; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.compaction.audit.AuditCountClientFactory; +import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; +import org.apache.gobblin.compaction.source.CompactionSource; +import org.apache.gobblin.compaction.suite.TestCompactionSuiteFactories; +import org.apache.gobblin.compaction.verify.CompactionAuditCountVerifier; +import org.apache.gobblin.compaction.verify.CompactionVerifier; +import org.apache.gobblin.compaction.verify.InputRecordCountHelper; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.data.management.dataset.SimpleDatasetHierarchicalPrioritizer; +import org.apache.gobblin.data.management.retention.profile.ConfigurableGlobDatasetFinder; +import org.apache.gobblin.runtime.api.JobExecutionResult; +import org.apache.gobblin.runtime.embedded.EmbeddedGobblin; + @Slf4j public class MRCompactionTaskTest { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java index 517b609..b677cd3 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/RenameSourceDirectoryTest.java @@ -15,23 +15,23 @@ * limitations under the License. */ package org.apache.gobblin.compaction.mapreduce; -import org.apache.gobblin.compaction.dataset.Dataset; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - +import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import org.testng.Assert; +import org.apache.gobblin.compaction.dataset.Dataset; -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java index 150eee0..b243a8e 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/avro/ConfBasedDeltaFieldProviderTest.java @@ -19,7 +19,6 @@ package org.apache.gobblin.compaction.mapreduce.avro; import java.util.List; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.testng.Assert; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java index 8211710..3d51218 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/conditions/RecompactionConditionTest.java @@ -20,26 +20,25 @@ package org.apache.gobblin.compaction.mapreduce.conditions; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.Period; -import org.joda.time.format.PeriodFormatterBuilder; import org.joda.time.format.PeriodFormatter; +import org.joda.time.format.PeriodFormatterBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; - - import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import com.google.common.collect.Lists; import com.google.common.base.Optional; -import org.apache.gobblin.compaction.conditions.RecompactionCondition; +import com.google.common.collect.Lists; + import org.apache.gobblin.compaction.conditions.RecompactionCombineCondition; +import org.apache.gobblin.compaction.conditions.RecompactionCondition; import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnDuration; import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnFileCount; import org.apache.gobblin.compaction.conditions.RecompactionConditionBasedOnRatio; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java index f876811..f9c855a 100644 --- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java +++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/verify/PinotAuditCountVerifierTest.java @@ -16,19 +16,23 @@ */ package org.apache.gobblin.compaction.verify; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.Test; + import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.Setter; + import org.apache.gobblin.compaction.audit.AuditCountClient; import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; -import lombok.Getter; -import lombok.Setter; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.Test; -import java.util.Map; /** * Class to test audit count verification logic http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java index df27f9f..3491013 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/ConfigurableGlobDatasetFinder.java @@ -23,12 +23,6 @@ import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -36,6 +30,13 @@ import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobPattern; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.gobblin.data.management.retention.DatasetCleaner; import org.apache.gobblin.dataset.Dataset; import org.apache.gobblin.dataset.DatasetsFinder; @@ -62,9 +63,11 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement public static final String DATASET_FINDER_PATTERN_KEY = CONFIGURATION_KEY_PREFIX + "dataset.pattern"; public static final String DATASET_FINDER_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.blacklist"; + public static final String DATASET_FINDER_GLOB_BLACKLIST_KEY = CONFIGURATION_KEY_PREFIX + "dataset.glob.blacklist"; protected final Path datasetPattern; private final Optional<Pattern> blacklist; + private final Optional<Pattern> globPatternBlacklist; private final Path commonRoot; protected final FileSystem fs; protected final Properties props; @@ -86,6 +89,12 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement this.blacklist = Optional.absent(); } + if (ConfigUtils.hasNonEmptyPath(config, DATASET_FINDER_GLOB_BLACKLIST_KEY)) { + this.globPatternBlacklist = Optional.of(GlobPattern.compile(config.getString(DATASET_FINDER_GLOB_BLACKLIST_KEY))); + } else { + this.globPatternBlacklist = Optional.absent(); + } + this.fs = fs; Path tmpDatasetPattern; @@ -132,6 +141,9 @@ public abstract class ConfigurableGlobDatasetFinder<T extends Dataset> implement if (this.blacklist.isPresent() && this.blacklist.get().matcher(pathToMatch.toString()).find()) { continue; } + if (this.globPatternBlacklist.isPresent() && this.globPatternBlacklist.get().matcher(pathToMatch.toString()).find()) { + continue; + } LOG.info("Found dataset at " + fileStatus.getPath()); datasets.add(datasetAtPath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()))); }
