This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e08eb  [GOBBLIN-1271] add MultiEventMetadataGenerator
37e08eb is described below

commit 37e08ebc14b67042e8844c9bbe6ca8eccfd6194c
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 23 16:20:22 2020 -0700

    [GOBBLIN-1271] add MultiEventMetadataGenerator
    
    Closes #3111 from
    arjun4084346/multiEventMetadataGenerators
---
 .../gobblin/runtime/AbstractJobLauncher.java       | 36 +++++-------
 .../runtime/api/MultiEventMetadataGenerator.java   | 61 ++++++++++++++++++++
 .../runtime/MultiEventMetadataGeneratorTest.java   | 65 ++++++++++++++++++++++
 .../org/apache/gobblin/util/PropertiesUtils.java   | 25 +++++++++
 .../apache/gobblin/util/PropertiesUtilsTest.java   | 11 ++++
 5 files changed, 176 insertions(+), 22 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 242daf8..0753f50 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -70,9 +70,9 @@ import org.apache.gobblin.metrics.event.EventName;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.EventMetadataGenerator;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.MultiEventMetadataGenerator;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
 import org.apache.gobblin.runtime.listeners.CloseableJobListener;
@@ -91,13 +91,13 @@ import 
org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ClusterNameTags;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
@@ -166,7 +166,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
   private final List<JobListener> mandatoryJobListeners = Lists.newArrayList();
 
   // Used to generate additional metadata to emit in timing events
-  private final EventMetadataGenerator eventMetadataGenerator;
+  private final MultiEventMetadataGenerator multiEventMetadataGenerator;
 
   public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>> 
metadataTags)
       throws Exception {
@@ -221,17 +221,9 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
       JobExecutionEventSubmitter jobExecutionEventSubmitter = new 
JobExecutionEventSubmitter(this.eventSubmitter);
       this.mandatoryJobListeners.add(new 
JobExecutionEventSubmitterListener(jobExecutionEventSubmitter));
 
-      String eventMetadatadataGeneratorClassName =
-          
jobProps.getProperty(ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
-              ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY);
-      try {
-        ClassAliasResolver<EventMetadataGenerator> aliasResolver =
-            new ClassAliasResolver<>(EventMetadataGenerator.class);
-        this.eventMetadataGenerator = 
aliasResolver.resolveClass(eventMetadatadataGeneratorClassName).newInstance();
-      } catch (ReflectiveOperationException e) {
-        throw new RuntimeException("Could not construct EventMetadataGenerator 
" +
-            eventMetadatadataGeneratorClassName, e);
-      }
+      this.multiEventMetadataGenerator = new MultiEventMetadataGenerator(
+          PropertiesUtils.getPropAsList(jobProps, 
ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
+              ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY));
     } catch (Exception e) {
       unlockJob();
       throw e;
@@ -413,7 +405,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         } else {
           workUnitStream = new 
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
         }
-        
workUnitsCreationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+        
workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
             EventName.WORK_UNITS_CREATION));
 
         if (this.runtimeMetricContext.isPresent()) {
@@ -456,7 +448,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         // important if the current batch of WorkUnits include failed 
WorkUnits from the previous
         // run which may still have left-over staging data not cleaned up yet.
         cleanLeftoverStagingData(workUnitStream, jobState);
-        
stagingDataCleanTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+        
stagingDataCleanTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
             EventName.MR_STAGING_DATA_CLEAN));
 
         long startTime = System.currentTimeMillis();
@@ -505,7 +497,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
             });
           }
 
-          
workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+          
workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
               EventName.WORK_UNITS_PREPARATION));
 
           // Write job execution info to the job history store before the job 
starts to run
@@ -514,7 +506,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           TimingEvent jobRunTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_RUN);
           // Start the job and wait for it to finish
           runWorkUnitStream(workUnitStream);
-          
jobRunTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));
+          
jobRunTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));
 
           this.eventSubmitter
               .submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL, 
"JOB_" + jobState.getState()));
@@ -529,7 +521,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           this.jobContext.finalizeJobStateBeforeCommit();
           this.jobContext.commit();
           postProcessJobState(jobState);
-          
jobCommitTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, 
EventName.JOB_COMMIT));
+          
jobCommitTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
 EventName.JOB_COMMIT));
         } finally {
           long endTime = System.currentTimeMillis();
           jobState.setEndTime(endTime);
@@ -544,11 +536,11 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         try {
           TimingEvent jobCleanupTimer = 
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
           cleanupStagingData(jobState);
-          
jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, 
EventName.JOB_CLEANUP));
+          
jobCleanupTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
 EventName.JOB_CLEANUP));
           // Write job execution info to the job history store upon job 
termination
           this.jobContext.storeJobExecutionInfo();
         } finally {
-          
launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext, 
EventName.FULL_JOB_EXECUTION));
+          
launchJobTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
 EventName.FULL_JOB_EXECUTION));
           if (isWorkUnitsEmpty) {
             //If no WorkUnits are created, first send the JobCompleteTimer 
event.
             notifyListeners(this.jobContext, jobListener, 
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
@@ -1040,7 +1032,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
       throw new JobException("Failed to execute all JobListeners", e);
     } finally {
       LOG.info("Submitting {}", timerEventName);
-      timer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+      timer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
           EventName.getEnumFromEventId(timerEventName)));
     }
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
new file mode 100644
index 0000000..fd4f721
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import avro.shaded.com.google.common.collect.Lists;
+
+import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * A class to add metadata from multiple {@link EventMetadataGenerator}s.
+ * {@link EventMetadataGenerator}s are supposed to be provided by a comma 
separated string.
+ * If multiple {@link EventMetadataGenerator}s add the same metadata, the one 
that comes later will take precedence.
+ */
+public class MultiEventMetadataGenerator {
+  private final List<EventMetadataGenerator> eventMetadataGenerators = 
Lists.newArrayList();
+
+  public MultiEventMetadataGenerator(List<String> 
multiEventMetadataGeneratorList) {
+    for (String eventMetadatadataGeneratorClassName : 
multiEventMetadataGeneratorList) {
+      try {
+        ClassAliasResolver<EventMetadataGenerator> aliasResolver = new 
ClassAliasResolver<>(EventMetadataGenerator.class);
+        
this.eventMetadataGenerators.add(aliasResolver.resolveClass(eventMetadatadataGeneratorClassName).newInstance());
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException("Could not construct EventMetadataGenerator 
" + eventMetadatadataGeneratorClassName, e);
+      }
+    }
+  }
+
+  public Map<String, String> getMetadata(JobContext jobContext, EventName 
eventName) {
+    Map<String, String> metadata = Maps.newHashMap();
+
+    for (EventMetadataGenerator eventMetadataGenerator : 
eventMetadataGenerators) {
+      metadata.putAll(eventMetadataGenerator.getMetadata(jobContext, 
eventName));
+    }
+
+    return metadata;
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
new file mode 100644
index 0000000..c1b71d3
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.util.Map;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.runtime.api.EventMetadataGenerator;
+import org.apache.gobblin.runtime.api.MultiEventMetadataGenerator;
+
+
+public class MultiEventMetadataGeneratorTest {
+
+  @Test
+  public void testInstantiate() {
+    JobContext jobContext = Mockito.mock(JobContext.class);
+    MultiEventMetadataGenerator multiEventMetadataGenerator = new 
MultiEventMetadataGenerator(ImmutableList.of(
+        
"org.apache.gobblin.runtime.MultiEventMetadataGeneratorTest$DummyEventMetadataGenerator",
+        
"org.apache.gobblin.runtime.MultiEventMetadataGeneratorTest$DummyEventMetadataGenerator2"));
+
+    Map<String, String> metadata = 
multiEventMetadataGenerator.getMetadata(jobContext, 
EventName.getEnumFromEventId("JobCompleteTimer"));
+    Assert.assertEquals(metadata.size(), 3);
+    Assert.assertEquals(metadata.get("dummyKey11"), "dummyValue11");
+    Assert.assertEquals(metadata.get("dummyKey12"), "dummyValue22");
+    Assert.assertEquals(metadata.get("dummyKey21"), "dummyValue21");
+  }
+
+  public static class DummyEventMetadataGenerator implements 
EventMetadataGenerator {
+
+    @Override
+    public Map<String, String> getMetadata(JobContext jobContext, EventName 
eventName) {
+      return ImmutableMap.of("dummyKey11", "dummyValue11", "dummyKey12", 
"dummyValue12");
+    }
+  }
+
+  public static class DummyEventMetadataGenerator2 implements 
EventMetadataGenerator {
+
+    @Override
+    public Map<String, String> getMetadata(JobContext jobContext, EventName 
eventName) {
+      return ImmutableMap.of("dummyKey21", "dummyValue21", "dummyKey12", 
"dummyValue22");
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 893c102..0eb8650 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.util;
 import java.io.IOException;
 import java.io.StringReader;
 import java.io.StringWriter;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -28,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 
 
@@ -36,6 +38,8 @@ import com.google.common.collect.ImmutableMap;
  */
 public class PropertiesUtils {
 
+  private static final Splitter LIST_SPLITTER = 
Splitter.on(",").trimResults().omitEmptyStrings();
+
   /**
    * Combine a variable number of {@link Properties} into a single {@link 
Properties}.
    */
@@ -71,6 +75,27 @@ public class PropertiesUtils {
   }
 
   /**
+   * Get the value of a comma separated property as a {@link List} of strings.
+   *
+   * @param key property key
+   * @return value associated with the key as a {@link List} of strings
+   */
+  public static List<String> getPropAsList(Properties properties, String key) {
+    return LIST_SPLITTER.splitToList(properties.getProperty(key));
+  }
+
+  /**
+   * Get the value of a property as a list of strings, using the given default 
value if the property is not set.
+   *
+   * @param key property key
+   * @param def default value
+   * @return value (the default value if the property is not set) associated 
with the key as a list of strings
+   */
+  public static List<String> getPropAsList(Properties properties, String key, 
String def) {
+    return LIST_SPLITTER.splitToList(properties.getProperty(key, def));
+  }
+
+  /**
    * Extract all the keys that start with a <code>prefix</code> in {@link 
Properties} to a new {@link Properties}
    * instance.
    *
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 87f6828..69c95c2 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -22,6 +22,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 
 
 public class PropertiesUtilsTest {
@@ -52,4 +53,14 @@ public class PropertiesUtilsTest {
     Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
     Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));
   }
+
+  @Test
+  public void testGetStringList() {
+    Properties properties = new Properties();
+    properties.put("key", "1,2, 3");
+
+    // values as comma separated strings
+    Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key"), 
ImmutableList.of("1", "2", "3"));
+    Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key2", 
"default"), ImmutableList.of("default"));
+  }
 }

Reply via email to