This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new fd38b4b [GOBBLIN-806] Enable metrics reporter during dataset
discovery for retention job
fd38b4b is described below
commit fd38b4b48520b2ca9c75de1613169b59de0a3dc2
Author: Zihan Li <[email protected]>
AuthorDate: Wed Jun 19 16:59:38 2019 -0700
[GOBBLIN-806] Enable metrics reporter during dataset discovery for
retention job
Enable metrics reporter during dataset discovery
for retention job
Enable event submitter for dataset finder imported
by tag
add SuppressWarnings
address comments
Closes #2672 from ZihanLi58/GOBBLIN-806
---
.../data/management/retention/DatasetCleaner.java | 16 ++++++++--------
.../retention/profile/MultiCleanableDatasetFinder.java | 8 +++++++-
.../retention/profile/MultiDatasetFinder.java | 17 +++++++++++------
3 files changed, 26 insertions(+), 15 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
index 6793f5a..848b0d2 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/DatasetCleaner.java
@@ -92,6 +92,13 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
FileSystem targetFs =
props.containsKey(ConfigurationKeys.WRITER_FILE_SYSTEM_URI) ?
WriterUtils.getWriterFs(state) : fs;
this.closer = Closer.create();
+ // TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics
does not depend on gobblin-core.
+ List<Tag<?>> tags = Lists.newArrayList();
+ tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
+ this.metricContext =
+ this.closer.register(Instrumented.getMetricContext(new State(props),
DatasetCleaner.class, tags));
+ this.isMetricEnabled = GobblinMetrics.isEnabled(props);
+ this.eventSubmitter = new EventSubmitter.Builder(this.metricContext,
RetentionEvents.NAMESPACE).build();
try {
FileSystem optionalRateControlledFs = targetFs;
if (props.contains(DATASET_CLEAN_HDFS_CALLS_PER_SECOND_LIMIT)) {
@@ -100,7 +107,7 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
((RateControlledFileSystem)
optionalRateControlledFs).startRateControl();
}
- this.datasetFinder = new
MultiCleanableDatasetFinder(optionalRateControlledFs, props);
+ this.datasetFinder = new
MultiCleanableDatasetFinder(optionalRateControlledFs, props, eventSubmitter);
} catch (NumberFormatException exception) {
throw new IOException(exception);
} catch (ExecutionException exception) {
@@ -111,13 +118,6 @@ public class DatasetCleaner implements Instrumentable,
Closeable {
100, ExecutorsUtils.newThreadFactory(Optional.of(LOG),
Optional.of("Dataset-cleaner-pool-%d")));
this.service = ExecutorsUtils.loggingDecorator(executor);
- List<Tag<?>> tags = Lists.newArrayList();
- tags.addAll(Tag.fromMap(AzkabanTags.getAzkabanTags()));
- // TODO -- Remove the dependency on gobblin-core after new Gobblin Metrics
does not depend on gobblin-core.
- this.metricContext =
- this.closer.register(Instrumented.getMetricContext(new State(props),
DatasetCleaner.class, tags));
- this.isMetricEnabled = GobblinMetrics.isEnabled(props);
- this.eventSubmitter = new EventSubmitter.Builder(this.metricContext,
RetentionEvents.NAMESPACE).build();
this.throwables = Lists.newArrayList();
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
index 93479e5..36007b3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiCleanableDatasetFinder.java
@@ -16,11 +16,13 @@
*/
package org.apache.gobblin.data.management.retention.profile;
+import com.google.common.base.Optional;
import org.apache.gobblin.data.management.retention.DatasetCleaner;
import java.net.URI;
import java.util.Properties;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.hadoop.fs.FileSystem;
import com.typesafe.config.Config;
@@ -49,7 +51,11 @@ public class MultiCleanableDatasetFinder extends
MultiDatasetFinder {
public MultiCleanableDatasetFinder(FileSystem fs, Properties jobProps) {
- super(fs, jobProps);
+ this(fs,jobProps,new
EventSubmitter.Builder(Optional.absent(),"noMessage").build());
+ }
+
+ public MultiCleanableDatasetFinder(FileSystem fs, Properties jobProps,
EventSubmitter eventSubmitter) {
+ super(fs, jobProps, eventSubmitter);
}
@Override
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
index d1bb84e..8c38ae7 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/profile/MultiDatasetFinder.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.data.management.retention.profile;
+import com.google.common.base.Optional;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -26,7 +27,7 @@ import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -72,6 +73,10 @@ public abstract class MultiDatasetFinder implements
DatasetsFinder<Dataset> {
@SuppressWarnings({ "rawtypes", "unchecked" })
public MultiDatasetFinder(FileSystem fs, Properties jobProps) {
+ this(fs,jobProps,new
EventSubmitter.Builder(Optional.absent(),"noMessage").build());
+ }
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public MultiDatasetFinder(FileSystem fs, Properties jobProps, EventSubmitter
eventSubmitter) {
this.jobProps = jobProps;
try {
this.datasetFinders = Lists.newArrayList();
@@ -79,10 +84,9 @@ public abstract class MultiDatasetFinder implements
DatasetsFinder<Dataset> {
if (jobProps.containsKey(datasetFinderClassKey())) {
try {
log.info(String.format("Instantiating datasetfinder %s ",
jobProps.getProperty(datasetFinderClassKey())));
- this.datasetFinders.add((DatasetsFinder)
ConstructorUtils.invokeConstructor(
- Class.forName(jobProps.getProperty(datasetFinderClassKey())),
fs, jobProps));
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
- | ClassNotFoundException e) {
+ this.datasetFinders.add((DatasetsFinder)
GobblinConstructorUtils.invokeLongestConstructor(
+ Class.forName(jobProps.getProperty(datasetFinderClassKey())),
fs, jobProps, eventSubmitter));
+ } catch (ReflectiveOperationException e) {
log.error(
String.format("Retention ignored could not instantiate
datasetfinder %s.",
jobProps.getProperty(datasetFinderClassKey())), e);
@@ -106,7 +110,8 @@ public abstract class MultiDatasetFinder implements
DatasetsFinder<Dataset> {
try {
this.datasetFinders.add((DatasetsFinder)
GobblinConstructorUtils.invokeFirstConstructor(
Class.forName(datasetClassConfig.getString(datasetFinderClassKey())),
ImmutableList.of(fs, jobProps,
- datasetClassConfig), ImmutableList.of(fs, jobProps)));
+ datasetClassConfig), ImmutableList.of(fs, jobProps,
eventSubmitter),
+ ImmutableList.of(fs, jobProps)));
log.info(String.format("Instantiated datasetfinder %s for %s.",
datasetClassConfig.getString(datasetFinderClassKey()),
importedBy));
} catch (InstantiationException | IllegalAccessException |
IllegalArgumentException