This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new fd38b4b  [GOBBLIN-806] Enable metrics reporter during dataset 
discovery for retention job
fd38b4b is described below

commit fd38b4b48520b2ca9c75de1613169b59de0a3dc2
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jun 19 16:59:38 2019 -0700

    [GOBBLIN-806] Enable metrics reporter during dataset discovery for 
retention job
    
    Enable metrics reporter during dataset discovery
    for retention job
    
    Enable event submitter for dataset finder imported
    by tag
    
    add SuppressWarnings
    
    address comments
    
    Closes #2672 from ZihanLi58/GOBBLIN-806
---
 .../data/management/retention/DatasetCleaner.java       | 16 ++++++++--------
 .../retention/profile/MultiCleanableDatasetFinder.java  |  8 +++++++-
 .../retention/profile/MultiDatasetFinder.java           | 17 +++++++++++------
 3 files changed, 26 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 6793f5a..848b0d2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -92,6 +92,13 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
     FileSystem targetFs =
         props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ? 
WriterUtils.getWriterFs(state) : fs;
     this.closer = Closer.create();
+    // TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics 
does not depend on gobblin-core.
+    List<Tag<?>> tags = Lists.newArrayList();
+    tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+    this.metricContext =
+        this.closer.register(Instrumented.getMetricContext(new State(props), 
DatasetCleaner.class, tags));
+    this.isMetricEnabled = GobblinMetrics.isEnabled(props);
+    this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
RetentionEvents.NAMESPACE).build();
     try {
       FileSystem optionalRateControlledFs = targetFs;
       if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
@@ -100,7 +107,7 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
         ((RateControlledFileSystem) 
optionalRateControlledFs).startRateControl();
       }
 
-      this.datasetFinder = new 
MultiCleanableDatasetFinder(optionalRateControlledFs, props);
+      this.datasetFinder = new 
MultiCleanableDatasetFinder(optionalRateControlledFs, props, eventSubmitter);
     } catch (NumberFormatException exception) {
       throw new IOException(exception);
     } catch (ExecutionException exception) {
@@ -111,13 +118,6 @@ public class DatasetCleaner implements Instrumentable, 
Closeable {
         100, ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("Dataset-cleaner-pool-%d")));
     this.service = ExecutorsUtils.loggingDecorator(executor);
 
-    List<Tag<?>> tags = Lists.newArrayList();
-    tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
-    // TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics 
does not depend on gobblin-core.
-    this.metricContext =
-        this.closer.register(Instrumented.getMetricContext(new State(props), 
DatasetCleaner.class, tags));
-    this.isMetricEnabled = GobblinMetrics.isEnabled(props);
-    this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, 
RetentionEvents.NAMESPACE).build();
     this.throwables = Lists.newArrayList();
   }
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
index 93479e5..36007b3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
@@ -16,11 +16,13 @@
  */
 package org.apache.gobblin.data.management.retention.profile;
 
+import com.google.common.base.Optional;
 import org.apache.gobblin.data.management.retention.DatasetCleaner;
 
 import java.net.URI;
 import java.util.Properties;
 
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.hadoop.fs.FileSystem;
 
 import com.typesafe.config.Config;
@@ -49,7 +51,11 @@ public class MultiCleanableDatasetFinder extends 
MultiDatasetFinder {
 
 
   public MultiCleanableDatasetFinder(FileSystem fs, Properties jobProps) {
-    super(fs, jobProps);
+    this(fs,jobProps,new 
EventSubmitter.Builder(Optional.absent(),"noMessage").build());
+  }
+
+  public MultiCleanableDatasetFinder(FileSystem fs, Properties jobProps, 
EventSubmitter eventSubmitter) {
+    super(fs, jobProps, eventSubmitter);
   }
 
   @Override
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
index d1bb84e..8c38ae7 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.data.management.retention.profile;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -26,7 +27,7 @@ import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -72,6 +73,10 @@ public abstract class MultiDatasetFinder implements 
DatasetsFinder<Dataset> {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   public MultiDatasetFinder(FileSystem fs, Properties jobProps) {
+    this(fs,jobProps,new 
EventSubmitter.Builder(Optional.absent(),"noMessage").build());
+  }
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public MultiDatasetFinder(FileSystem fs, Properties jobProps, EventSubmitter 
eventSubmitter) {
     this.jobProps = jobProps;
     try {
       this.datasetFinders = Lists.newArrayList();
@@ -79,10 +84,9 @@ public abstract class MultiDatasetFinder implements 
DatasetsFinder<Dataset> {
       if (jobProps.containsKey(datasetFinderClassKey())) {
         try {
           log.info(String.format("Instantiating datasetfinder %s ", 
jobProps.getProperty(datasetFinderClassKey())));
-          this.datasetFinders.add((DatasetsFinder) 
ConstructorUtils.invokeConstructor(
-              Class.forName(jobProps.getProperty(datasetFinderClassKey())), 
fs, jobProps));
-        } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
-            | ClassNotFoundException e) {
+          this.datasetFinders.add((DatasetsFinder) 
GobblinConstructorUtils.invokeLongestConstructor(
+              Class.forName(jobProps.getProperty(datasetFinderClassKey())), 
fs, jobProps, eventSubmitter));
+        } catch (ReflectiveOperationException e) {
           log.error(
               String.format("Retention ignored could not instantiate 
datasetfinder %s.",
                   jobProps.getProperty(datasetFinderClassKey())), e);
@@ -106,7 +110,8 @@ public abstract class MultiDatasetFinder implements 
DatasetsFinder<Dataset> {
           try {
             this.datasetFinders.add((DatasetsFinder) 
GobblinConstructorUtils.invokeFirstConstructor(
                 
Class.forName(datasetClassConfig.getString(datasetFinderClassKey())), 
ImmutableList.of(fs, jobProps,
-                    datasetClassConfig), ImmutableList.of(fs, jobProps)));
+                    datasetClassConfig), ImmutableList.of(fs, jobProps, 
eventSubmitter),
+                    ImmutableList.of(fs, jobProps)));
             log.info(String.format("Instantiated datasetfinder %s for %s.",
                 datasetClassConfig.getString(datasetFinderClassKey()), 
importedBy));
           } catch (InstantiationException | IllegalAccessException | 
IllegalArgumentException

Reply via email to