[GOBBLIN-448] Add glob pattern blacklist in ConfigurableGlobDatasetFinder Closes #2322 from yukuai518/blacklist
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d0e944c Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d0e944c Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d0e944c Branch: refs/heads/master Commit: 5d0e944c3facc9814d16fa7588f958f1e763a468 Parents: 54bda27 Author: Kuai Yu <[email protected]> Authored: Tue Mar 27 11:32:30 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Mar 27 11:32:30 2018 -0700 ---------------------------------------------------------------------- .../cluster/ClusterEventMetadataGenerator.java | 1 - .../gobblin/cluster/GobblinClusterManager.java | 21 +++---- .../gobblin/cluster/GobblinClusterUtils.java | 11 ++-- .../apache/gobblin/cluster/GobblinHelixJob.java | 7 +-- .../cluster/GobblinHelixJobLauncher.java | 8 +-- .../cluster/GobblinHelixJobScheduler.java | 19 +++---- .../cluster/GobblinHelixMessagingService.java | 10 ++-- .../gobblin/cluster/GobblinHelixTask.java | 1 - .../cluster/GobblinHelixTaskStateTracker.java | 2 - .../gobblin/cluster/GobblinTaskRunner.java | 14 ++--- .../cluster/GobblinTaskRunnerMetrics.java | 9 --- .../gobblin/cluster/GobblinTaskStateModel.java | 1 - .../cluster/GobblinTaskStateModelFactory.java | 1 - .../cluster/JobConfigurationManager.java | 3 +- .../ScheduledJobConfigurationManager.java | 13 +++-- .../gobblin/cluster/SingleTaskLauncher.java | 4 +- .../StreamingJobConfigurationManager.java | 13 +++-- .../gobblin/cluster/GobblinClusterKillTest.java | 15 +++-- .../cluster/GobblinClusterUtilsTest.java | 14 +++-- .../cluster/GobblinHelixJobLauncherTest.java | 12 ++-- .../gobblin/cluster/GobblinHelixTaskTest.java | 8 +-- .../apache/gobblin/cluster/HelixUtilsTest.java | 1 - .../cluster/JobConfigurationManagerTest.java | 1 + .../gobblin/cluster/SingleTaskLauncherTest.java | 4 +- .../org/apache/gobblin/cluster/TestHelper.java | 2 - .../TestShutdownMessageHandlerFactory.java | 1 - .../compaction/ReflectionCompactorFactory.java | 4 +- .../action/CompactionCompleteAction.java | 5 +- .../CompactionCompleteFileOperationAction.java | 30 +++++----- .../CompactionHiveRegistrationAction.java | 23 ++++---- .../action/CompactionMarkDirectoryAction.java | 26 ++++----- .../audit/KafkaAuditCountHttpClient.java | 26 +++++---- .../audit/PinotAuditCountHttpClient.java | 26 +++++---- .../RecompactionCombineCondition.java | 4 +- .../RecompactionConditionBasedOnDuration.java | 11 ++-- .../RecompactionConditionBasedOnFileCount.java | 3 +- .../RecompactionConditionBasedOnRatio.java | 13 ++--- .../gobblin/compaction/dataset/Dataset.java | 8 +-- .../compaction/dataset/DatasetHelper.java | 4 +- .../compaction/dataset/DatasetsFinder.java | 4 +- .../dataset/TimeBasedSubDirDatasetsFinder.java | 19 ++++--- .../HiveRegistrationCompactorListener.java | 2 +- .../CompactionLauncherWriter.java | 7 ++- .../CompactionLauncherWriterBuilder.java | 2 + .../HiveMetadataForCompactionExtractor.java | 8 ++- ...veMetadataForCompactionExtractorFactory.java | 8 ++- .../hivebasedconstructs/MRCompactionEntity.java | 2 + .../ReflectionCompactorListenerFactory.java | 4 +- .../SimpleCompactorCompletionListener.java | 3 +- .../CompactionAvroJobConfigurator.java | 60 +++++++++++--------- .../compaction/mapreduce/MRCompactionTask.java | 16 ++++-- .../mapreduce/MRCompactionTaskFactory.java | 1 - .../compaction/mapreduce/MRCompactor.java | 26 ++++----- .../mapreduce/MRCompactorJobPropCreator.java | 5 +- .../mapreduce/MRCompactorJobRunner.java | 3 - .../avro/AvroKeyCombineFileRecordReader.java | 5 +- .../AvroKeyRecursiveCombineFileInputFormat.java | 12 ---- .../FieldAttributeBasedDeltaFieldsProvider.java | 7 +-- .../compaction/parser/CompactionPathParser.java | 6 +- .../compaction/source/CompactionSource.java | 57 ++++++++++--------- .../compaction/suite/CompactionAvroSuite.java | 20 ++++--- .../compaction/suite/CompactionSuite.java | 14 ++--- .../verify/CompactionAuditCountVerifier.java | 17 +++--- .../verify/CompactionThresholdVerifier.java | 15 ++--- .../verify/CompactionTimeRangeVerifier.java | 16 +++--- .../compaction/verify/CompactionVerifier.java | 4 +- .../verify/InputRecordCountHelper.java | 29 +++++----- .../mapreduce/MRCompactionTaskTest.java | 36 ++++++------ .../mapreduce/RenameSourceDirectoryTest.java | 18 +++--- .../avro/ConfBasedDeltaFieldProviderTest.java | 1 - .../conditions/RecompactionConditionTest.java | 11 ++-- .../verify/PinotAuditCountVerifierTest.java | 16 ++++-- .../profile/ConfigurableGlobDatasetFinder.java | 24 ++++++-- 73 files changed, 435 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java index 6aeb89c..842e285 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ClusterEventMetadataGenerator.java @@ -23,7 +23,6 @@ import java.util.Map; import com.google.common.collect.ImmutableMap; import org.apache.gobblin.annotation.Alias; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.event.EventName; import org.apache.gobblin.runtime.EventMetadataUtils; import org.apache.gobblin.runtime.JobContext; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index d57c61e..1592cf8 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -20,7 +20,6 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -36,13 +35,6 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.lang.StringUtils; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareHistogram; -import org.apache.gobblin.metrics.ContextAwareMetric; -import org.apache.gobblin.metrics.GobblinMetrics; -import org.apache.gobblin.metrics.MetricContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -67,8 +59,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -80,9 +72,17 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import javax.annotation.Nonnull; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.event.ClusterManagerShutdownRequest; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareHistogram; +import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.app.ApplicationException; @@ -93,9 +93,6 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; -import javax.annotation.Nonnull; -import lombok.Getter; - /** * The central cluster manager for Gobblin Clusters. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java index 6b6ead8..41f926e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java @@ -17,19 +17,20 @@ package org.apache.gobblin.cluster; -import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; - import java.net.InetAddress; import java.net.UnknownHostException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import com.typesafe.config.Config; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.AbstractJobLauncher; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import lombok.extern.slf4j.Slf4j; +import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; @Alpha @Slf4j http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java index bc9f88f..db34e3e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJob.java @@ -20,20 +20,19 @@ package org.apache.gobblin.cluster; import java.util.Properties; import java.util.concurrent.Future; -import lombok.extern.slf4j.Slf4j; - - import org.quartz.InterruptableJob; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; +import org.quartz.UnableToInterruptJobException; + +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.BaseGobblinJob; import org.apache.gobblin.scheduler.JobScheduler; -import org.quartz.UnableToInterruptJobException; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index d502462..5035216 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -25,8 +25,6 @@ import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,6 +45,8 @@ import com.google.common.collect.Maps; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; +import javax.annotation.Nullable; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.StateStore; @@ -55,11 +55,13 @@ import org.apache.gobblin.metrics.event.TimingEvent; import org.apache.gobblin.rest.LauncherTypeEnum; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.ExecutionModel; +import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.Task; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.TaskStateCollectorService; +import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -69,8 +71,6 @@ import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.ParallelRunner; import org.apache.gobblin.util.SerializationUtils; -import javax.annotation.Nullable; - /** * An implementation of {@link JobLauncher} that launches a Gobblin job using the Helix task framework. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index ef12162..e539273 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -30,8 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.PropertiesUtils; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; @@ -42,31 +40,32 @@ import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import javax.annotation.Nonnull; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.api.JobExecutionLauncher; -import org.apache.gobblin.runtime.api.MutableJobCatalog; +import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobLauncher; -import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.listeners.AbstractJobListener; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; - - -import javax.annotation.Nonnull; -import lombok.Getter; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java index cdced1b..4101f17 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java @@ -16,29 +16,29 @@ */ package org.apache.gobblin.cluster; -import com.google.common.base.Strings; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; - import java.util.UUID; import java.util.regex.Pattern; + import org.apache.helix.Criteria; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; import org.apache.helix.messaging.CriteriaEvaluator; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.messaging.ZNRecordRow; -import org.apache.helix.PropertyKey; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.Message; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index c6b9514..e651b8e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -35,7 +35,6 @@ import com.google.common.io.Closer; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.MultiWorkUnit; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java index 79df1fa..7120bef 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java @@ -22,8 +22,6 @@ import java.util.Properties; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; -import org.apache.helix.HelixManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 3ec40dc..e68774d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -36,11 +36,6 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.Tag; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -74,9 +69,15 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import javax.annotation.Nonnull; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.GobblinMetrics; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.runtime.TaskStateTracker; import org.apache.gobblin.runtime.services.JMXReportingService; @@ -86,9 +87,6 @@ import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JvmUtils; import org.apache.gobblin.util.PathUtils; -import javax.annotation.Nonnull; -import lombok.Getter; - import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java index 51e8b36..6435ff4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java @@ -17,17 +17,8 @@ package org.apache.gobblin.cluster; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import com.codahale.metrics.Metric; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.runtime.TaskExecutor; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java index 5a96e48..cc4a215 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModel.java @@ -25,7 +25,6 @@ import org.apache.helix.task.TaskFactory; import org.apache.helix.task.TaskStateModel; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.TaskExecutor; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java index 335a1e0..3603571 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskStateModelFactory.java @@ -26,7 +26,6 @@ import org.apache.helix.task.TaskStateModel; import org.apache.helix.task.TaskStateModelFactory; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.TaskExecutor; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java index 42fab27..d60540c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/JobConfigurationManager.java @@ -21,7 +21,6 @@ import java.io.File; import java.util.List; import java.util.Properties; -import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +29,8 @@ import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; +import javax.annotation.Nullable; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java index 0f2d356..00c7b0d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java @@ -27,23 +27,24 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; -import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; -import org.apache.gobblin.runtime.api.SpecConsumer; -import org.apache.gobblin.runtime.api.SpecExecutor; @Alpha http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java index 8cbbc00..3a0c780 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskLauncher.java @@ -27,11 +27,11 @@ import org.apache.commons.lang3.text.StrTokenizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.typesafe.config.Config; + import org.apache.gobblin.util.GobblinProcessBuilder; import org.apache.gobblin.util.SystemPropertiesWrapper; -import com.typesafe.config.Config; - import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.CLUSTER_CONFIG_FILE_PATH; import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.JOB_ID; import static org.apache.gobblin.cluster.SingleTaskRunnerMainOptions.WORK_UNIT_FILE_PATH; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java index 3c01704..b6985a1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java @@ -25,27 +25,28 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; -import org.apache.gobblin.runtime.api.SpecConsumer; - -import lombok.Getter; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java index ad19135..11b8808 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterKillTest.java @@ -17,11 +17,6 @@ package org.apache.gobblin.cluster; -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigValueFactory; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -31,9 +26,9 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Iterator; import java.util.concurrent.TimeoutException; + import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; -import org.apache.gobblin.testing.AssertWithBackoff; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; @@ -43,6 +38,14 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.testing.AssertWithBackoff; + /** * Unit tests for killing {@link GobblinClusterManager}s and {@link GobblinTaskRunner}s http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java index 4d83658..9288d42 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java @@ -17,18 +17,20 @@ package org.apache.gobblin.cluster; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; - -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.testng.annotations.Test; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + public class GobblinClusterUtilsTest { FileSystem fs = mock(FileSystem.class); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 10ef3db..ef5fc4f 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -29,10 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.avro.Schema; import org.apache.curator.test.TestingServer; -import org.apache.gobblin.metastore.DatasetStateStore; -import org.apache.gobblin.runtime.JobContext; -import org.apache.gobblin.runtime.listeners.AbstractJobListener; -import org.apache.gobblin.util.ClassAliasResolver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -56,16 +52,20 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import lombok.Getter; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.FsDatasetStateStore; +import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.listeners.AbstractJobListener; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; -import lombok.Getter; - /** * Unit tests for {@link GobblinHelixJobLauncher}. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java index 1cbd72e..a104b7d 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java @@ -17,26 +17,20 @@ package org.apache.gobblin.cluster; -import com.typesafe.config.ConfigFactory; -import org.apache.gobblin.metastore.FsStateStore; import java.io.File; import java.io.IOException; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.helix.HelixManager; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskResult; - import org.mockito.Mockito; - import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -44,10 +38,12 @@ import org.testng.annotations.Test; import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.typesafe.config.ConfigFactory; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.example.simplejson.SimpleJsonConverter; import org.apache.gobblin.example.simplejson.SimpleJsonSource; +import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.TaskExecutor; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java index 2afd01f..d7841a8 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; - import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java index 73098f2..9e36d8c 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/JobConfigurationManagerTest.java @@ -39,6 +39,7 @@ import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; + import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent; import org.apache.gobblin.configuration.ConfigurationKeys; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java index afa933d..3c56428 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/SingleTaskLauncherTest.java @@ -25,11 +25,11 @@ import java.util.List; import org.testng.annotations.Test; +import com.typesafe.config.ConfigFactory; + import org.apache.gobblin.util.GobblinProcessBuilder; import org.apache.gobblin.util.SystemPropertiesWrapper; -import com.typesafe.config.ConfigFactory; - import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java index b717585..e8b32ab 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestHelper.java @@ -26,12 +26,10 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.TestingServer; - import org.testng.Assert; import com.google.common.io.Closer; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java index c0b80a9..cd805bf 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestShutdownMessageHandlerFactory.java @@ -25,7 +25,6 @@ import org.apache.helix.messaging.handling.HelixTaskResult; import org.apache.helix.messaging.handling.MessageHandler; import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; - import org.testng.Assert; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java index fc68e47..1ba6fac 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/ReflectionCompactorFactory.java @@ -20,11 +20,11 @@ package org.apache.gobblin.compaction; import java.util.List; import java.util.Properties; +import org.apache.commons.lang3.reflect.ConstructorUtils; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import org.apache.commons.lang3.reflect.ConstructorUtils; - import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.metrics.Tag; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java index a5b21bc..8c5979c 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteAction.java @@ -16,11 +16,12 @@ */ package org.apache.gobblin.compaction.action; -import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.metrics.event.EventSubmitter; import java.io.IOException; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.metrics.event.EventSubmitter; + /** * An interface which represents an action that is invoked after a compaction job is finished. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java index a21ca93..831443b 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java @@ -17,9 +17,21 @@ package org.apache.gobblin.compaction.action; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Job; + import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import org.apache.gobblin.compaction.dataset.DatasetHelper; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -33,20 +45,6 @@ import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.WriterUtils; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapreduce.Counter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.TaskCompletionEvent; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java index 7792a50..a7536d3 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java @@ -17,6 +17,18 @@ package org.apache.gobblin.compaction.action; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.parser.CompactionPathParser; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -29,17 +41,6 @@ import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.metrics.event.EventSubmitter; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; - /** * Class responsible for hive registration after compaction is complete http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java index b504996..89be94a 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionMarkDirectoryAction.java @@ -17,6 +17,19 @@ package org.apache.gobblin.compaction.action; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.mapreduce.CompactionAvroJobConfigurator; import org.apache.gobblin.compaction.mapreduce.MRCompactor; @@ -25,19 +38,6 @@ import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.FileSystemDataset; import org.apache.gobblin.metrics.event.EventSubmitter; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; - @Slf4j @AllArgsConstructor http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java index 7f6fb68..04988fe 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/KafkaAuditCountHttpClient.java @@ -17,15 +17,11 @@ package org.apache.gobblin.compaction.audit; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Maps; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.gobblin.configuration.State; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; -import javax.annotation.concurrent.ThreadSafe; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; -import java.io.IOException; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import javax.annotation.concurrent.ThreadSafe; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; /** * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java index 0e06606..5889edd 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/audit/PinotAuditCountHttpClient.java @@ -17,15 +17,11 @@ package org.apache.gobblin.compaction.audit; -import com.google.api.client.util.Charsets; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import org.apache.gobblin.configuration.State; +import java.io.IOException; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.Map; -import javax.annotation.concurrent.ThreadSafe; -import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; @@ -35,10 +31,16 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; -import java.io.IOException; -import java.net.URLEncoder; -import java.util.HashMap; -import java.util.Map; +import com.google.api.client.util.Charsets; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import javax.annotation.concurrent.ThreadSafe; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.State; /** * A {@link AuditCountClient} which uses {@link org.apache.http.client.HttpClient} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java index 811be5b..a7731f6 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionCombineCondition.java @@ -17,17 +17,17 @@ package org.apache.gobblin.compaction.conditions; - import java.lang.reflect.InvocationTargetException; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.collect.ImmutableList; import org.apache.gobblin.annotation.Alias; -import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java index c432ed2..3704fab 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnDuration.java @@ -17,12 +17,6 @@ package org.apache.gobblin.compaction.conditions; - -import org.apache.gobblin.annotation.Alias; -import org.apache.gobblin.compaction.dataset.DatasetHelper; -import org.apache.gobblin.compaction.dataset.Dataset; -import org.apache.gobblin.compaction.mapreduce.MRCompactor; - import org.joda.time.DateTime; import org.joda.time.Period; import org.joda.time.format.PeriodFormatter; @@ -32,6 +26,11 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.dataset.DatasetHelper; +import org.apache.gobblin.compaction.mapreduce.MRCompactor; + /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java index b8817fe..9e8e25b 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnFileCount.java @@ -16,13 +16,14 @@ */ package org.apache.gobblin.compaction.conditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.compaction.dataset.Dataset; import org.apache.gobblin.compaction.dataset.DatasetHelper; import org.apache.gobblin.compaction.mapreduce.MRCompactor; -import org.apache.gobblin.compaction.dataset.Dataset; /** * An implementation {@link RecompactionCondition} which examines the number of files in the late outputDir http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java index d58a17d..e2bcdb4 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/conditions/RecompactionConditionBasedOnRatio.java @@ -17,16 +17,9 @@ package org.apache.gobblin.compaction.conditions; - import java.util.List; import java.util.Map; -import org.apache.gobblin.annotation.Alias; -import org.apache.gobblin.compaction.dataset.DatasetHelper; -import org.apache.gobblin.compaction.dataset.Dataset; -import org.apache.gobblin.compaction.mapreduce.MRCompactor; -import org.apache.gobblin.util.DatasetFilterUtils; - import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +27,12 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Splitter; import com.google.common.collect.Maps; +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.dataset.DatasetHelper; +import org.apache.gobblin.compaction.mapreduce.MRCompactor; +import org.apache.gobblin.util.DatasetFilterUtils; + /** * An implementation {@link RecompactionCondition} which examines the late record percentage. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java index 73f12a9..36ec939 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/Dataset.java @@ -22,10 +22,6 @@ import java.util.Collections; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -33,6 +29,10 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.FileSystemDataset; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java index 2eede65..ca05822 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetHelper.java @@ -26,14 +26,14 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.joda.time.DateTimeZone; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.base.Optional; import org.apache.gobblin.compaction.conditions.RecompactionCondition; import org.apache.gobblin.compaction.conditions.RecompactionConditionFactory; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java index cd56460..6ed04af 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/DatasetsFinder.java @@ -17,17 +17,14 @@ package org.apache.gobblin.compaction.dataset; -import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils; import java.io.IOException; import java.net.URI; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -39,6 +36,7 @@ import com.google.common.collect.Lists; import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils; import org.apache.gobblin.util.DatasetFilterUtils; import org.apache.gobblin.util.HadoopUtils; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java index b00d7f4..111fe75 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/dataset/TimeBasedSubDirDatasetsFinder.java @@ -17,12 +17,9 @@ package org.apache.gobblin.compaction.dataset; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; -import org.apache.gobblin.compaction.mapreduce.MRCompactor; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.DatasetFilterUtils; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.util.Set; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,8 +32,14 @@ import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.PeriodFormatter; import org.joda.time.format.PeriodFormatterBuilder; -import java.io.IOException; -import java.util.Set; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.compaction.mapreduce.MRCompactor; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.DatasetFilterUtils; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java index 3eec1d5..d6b5d81 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/registration/HiveRegistrationCompactorListener.java @@ -19,8 +19,8 @@ package org.apache.gobblin.compaction.hive.registration; import java.util.Properties; -import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.configuration.State; import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.policy.HiveRegistrationPolicy; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java index c6a66c6..43c5fbc 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriter.java @@ -21,15 +21,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; + import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Joiner; + import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.compaction.mapreduce.MRCompactor; +import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.writer.DataWriter; -import org.apache.gobblin.compaction.mapreduce.avro.ConfBasedDeltaFieldProvider; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java index d108343..156cc54 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/CompactionLauncherWriterBuilder.java @@ -18,7 +18,9 @@ package org.apache.gobblin.compaction.hivebasedconstructs; import java.io.IOException; + import org.apache.avro.Schema; + import org.apache.gobblin.writer.DataWriter; import org.apache.gobblin.writer.DataWriterBuilder; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java index c5b817e..020da2d 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractor.java @@ -19,20 +19,24 @@ package org.apache.gobblin.compaction.hivebasedconstructs; import java.io.IOException; import java.util.List; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.thrift.TException; + import com.google.common.base.Splitter; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor; import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.util.AutoReturnableObject; -import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor; -import lombok.extern.slf4j.Slf4j; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java index 27958b5..ab89df5 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/HiveMetadataForCompactionExtractorFactory.java @@ -17,14 +17,16 @@ package org.apache.gobblin.compaction.hivebasedconstructs; -import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor; -import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory; import java.io.IOException; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.thrift.TException; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractor; +import org.apache.gobblin.data.management.conversion.hive.extractor.HiveBaseExtractorFactory; + /** * Factory for {@link HiveMetadataForCompactionExtractor} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java index 2d721d0..1f17955 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hivebasedconstructs/MRCompactionEntity.java @@ -19,7 +19,9 @@ package org.apache.gobblin.compaction.hivebasedconstructs; import java.util.List; import java.util.Properties; + import org.apache.hadoop.fs.Path; + import lombok.Getter; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java index e9a1fca..b780c2c 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/ReflectionCompactorListenerFactory.java @@ -21,12 +21,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.commons.lang3.reflect.ConstructorUtils; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Strings; -import org.apache.commons.lang3.reflect.ConstructorUtils; - import org.apache.gobblin.configuration.State; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java index b4365bc..e82bb33 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/listeners/SimpleCompactorCompletionListener.java @@ -18,12 +18,13 @@ package org.apache.gobblin.compaction.listeners; import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.gobblin.annotation.Alias; -import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.compaction.dataset.Dataset; +import org.apache.gobblin.compaction.mapreduce.MRCompactor; import org.apache.gobblin.configuration.State; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java index b8f407f..c9b7708 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java @@ -17,25 +17,16 @@ package org.apache.gobblin.compaction.mapreduce; -import com.google.common.base.Enums; -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; -import org.apache.gobblin.compaction.dataset.DatasetHelper; -import org.apache.gobblin.compaction.mapreduce.avro.*; -import org.apache.gobblin.compaction.parser.CompactionPathParser; -import org.apache.gobblin.compaction.verify.InputRecordCountHelper; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.dataset.Dataset; -import org.apache.gobblin.dataset.FileSystemDataset; -import org.apache.gobblin.util.AvroUtils; -import org.apache.gobblin.util.FileListUtils; -import org.apache.gobblin.util.HadoopUtils; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroValue; @@ -52,15 +43,30 @@ import org.apache.hadoop.mapreduce.TaskCompletionEvent; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +import com.google.common.base.Enums; +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.primitives.Ints; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.compaction.dataset.DatasetHelper; +import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyCompactorOutputFormat; +import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyDedupReducer; +import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyMapper; +import org.apache.gobblin.compaction.mapreduce.avro.AvroKeyRecursiveCombineFileInputFormat; +import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner; +import org.apache.gobblin.compaction.parser.CompactionPathParser; +import org.apache.gobblin.compaction.verify.InputRecordCountHelper; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.FileSystemDataset; +import org.apache.gobblin.util.AvroUtils; +import org.apache.gobblin.util.FileListUtils; +import org.apache.gobblin.util.HadoopUtils; /** * A configurator that focused on creating avro compaction map-reduce job http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java index be71804..78ed1c2 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTask.java @@ -17,6 +17,16 @@ package org.apache.gobblin.compaction.mapreduce; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Job; + +import com.google.common.collect.ImmutableMap; + +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.compaction.action.CompactionCompleteAction; import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; import org.apache.gobblin.compaction.suite.CompactionSuite; @@ -27,13 +37,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.runtime.TaskContext; import org.apache.gobblin.runtime.mapreduce.MRTask; -import java.util.List; -import java.io.IOException; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.mapreduce.Job; -import com.google.common.collect.ImmutableMap; /** * Customized task of type {@link MRTask}, which runs MR job to compact dataset. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java index ff8b012..9d5b749 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactionTaskFactory.java @@ -18,7 +18,6 @@ package org.apache.gobblin.compaction.mapreduce; import java.io.IOException; - import org.apache.gobblin.runtime.TaskContext; import org.apache.gobblin.runtime.mapreduce.MRTaskFactory; import org.apache.gobblin.runtime.task.TaskIFace; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d0e944c/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java ---------------------------------------------------------------------- diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java index 9e53ef4..953c0dc 100644 --- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java +++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactor.java @@ -17,13 +17,6 @@ package org.apache.gobblin.compaction.mapreduce; -import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE; -import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP; -import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED; -import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED; -import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED; -import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED; - import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; @@ -36,9 +29,7 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import java.lang.reflect.InvocationTargetException; -import org.joda.time.DateTime; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -66,13 +57,13 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.gobblin.compaction.Compactor; -import org.apache.gobblin.compaction.listeners.CompactorCompletionListener; -import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory; -import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.compaction.dataset.Dataset; import org.apache.gobblin.compaction.dataset.DatasetsFinder; import org.apache.gobblin.compaction.dataset.TimeBasedSubDirDatasetsFinder; import org.apache.gobblin.compaction.event.CompactionSlaEventHelper; +import org.apache.gobblin.compaction.listeners.CompactorCompletionListener; +import org.apache.gobblin.compaction.listeners.CompactorCompletionListenerFactory; +import org.apache.gobblin.compaction.listeners.CompactorListener; import org.apache.gobblin.compaction.verify.DataCompletenessVerifier; import org.apache.gobblin.compaction.verify.DataCompletenessVerifier.Results; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -81,15 +72,22 @@ import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ClusterNameTags; import org.apache.gobblin.util.DatasetFilterUtils; import org.apache.gobblin.util.ExecutorsUtils; -import org.apache.gobblin.util.HadoopUtils; -import org.apache.gobblin.util.ClusterNameTags; import org.apache.gobblin.util.FileListUtils; +import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.recordcount.CompactionRecordCountProvider; import org.apache.gobblin.util.recordcount.IngestionRecordCountProvider; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.COMPACTION_COMPLETE; +import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.GIVEN_UP; +import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.UNVERIFIED; +import static org.apache.gobblin.compaction.dataset.Dataset.DatasetState.VERIFIED; +import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.ABORTED; +import static org.apache.gobblin.compaction.mapreduce.MRCompactorJobRunner.Status.COMMITTED; + /** * MapReduce-based {@link org.apache.gobblin.compaction.Compactor}. Compaction will run on each qualified {@link Dataset} * under {@link #COMPACTION_INPUT_DIR}.
