This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 37e08eb [GOBBLIN-1271] add MultiEventMetadataGenerator
37e08eb is described below
commit 37e08ebc14b67042e8844c9bbe6ca8eccfd6194c
Author: Arjun <[email protected]>
AuthorDate: Wed Sep 23 16:20:22 2020 -0700
[GOBBLIN-1271] add MultiEventMetadataGenerator
Closes #3111 from
arjun4084346/multiEventMetadataGenerators
---
.../gobblin/runtime/AbstractJobLauncher.java | 36 +++++-------
.../runtime/api/MultiEventMetadataGenerator.java | 61 ++++++++++++++++++++
.../runtime/MultiEventMetadataGeneratorTest.java | 65 ++++++++++++++++++++++
.../org/apache/gobblin/util/PropertiesUtils.java | 25 +++++++++
.../apache/gobblin/util/PropertiesUtilsTest.java | 11 ++++
5 files changed, 176 insertions(+), 22 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 242daf8..0753f50 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -70,9 +70,9 @@ import org.apache.gobblin.metrics.event.EventName;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.EventMetadataGenerator;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.MultiEventMetadataGenerator;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.job_spec.JobSpecResolver;
import org.apache.gobblin.runtime.listeners.CloseableJobListener;
@@ -91,13 +91,13 @@ import
org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ClusterNameTags;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -166,7 +166,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
private final List<JobListener> mandatoryJobListeners = Lists.newArrayList();
// Used to generate additional metadata to emit in timing events
- private final EventMetadataGenerator eventMetadataGenerator;
+ private final MultiEventMetadataGenerator multiEventMetadataGenerator;
public AbstractJobLauncher(Properties jobProps, List<? extends Tag<?>>
metadataTags)
throws Exception {
@@ -221,17 +221,9 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
JobExecutionEventSubmitter jobExecutionEventSubmitter = new
JobExecutionEventSubmitter(this.eventSubmitter);
this.mandatoryJobListeners.add(new
JobExecutionEventSubmitterListener(jobExecutionEventSubmitter));
- String eventMetadatadataGeneratorClassName =
-
jobProps.getProperty(ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
- ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY);
- try {
- ClassAliasResolver<EventMetadataGenerator> aliasResolver =
- new ClassAliasResolver<>(EventMetadataGenerator.class);
- this.eventMetadataGenerator =
aliasResolver.resolveClass(eventMetadatadataGeneratorClassName).newInstance();
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("Could not construct EventMetadataGenerator
" +
- eventMetadatadataGeneratorClassName, e);
- }
+ this.multiEventMetadataGenerator = new MultiEventMetadataGenerator(
+ PropertiesUtils.getPropAsList(jobProps,
ConfigurationKeys.EVENT_METADATA_GENERATOR_CLASS_KEY,
+ ConfigurationKeys.DEFAULT_EVENT_METADATA_GENERATOR_CLASS_KEY));
} catch (Exception e) {
unlockJob();
throw e;
@@ -413,7 +405,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
} else {
workUnitStream = new
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
}
-
workUnitsCreationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+
workUnitsCreationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_CREATION));
if (this.runtimeMetricContext.isPresent()) {
@@ -456,7 +448,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
// important if the current batch of WorkUnits include failed
WorkUnits from the previous
// run which may still have left-over staging data not cleaned up yet.
cleanLeftoverStagingData(workUnitStream, jobState);
-
stagingDataCleanTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+
stagingDataCleanTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.MR_STAGING_DATA_CLEAN));
long startTime = System.currentTimeMillis();
@@ -505,7 +497,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
});
}
-
workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+
workUnitsPreparationTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_PREPARATION));
// Write job execution info to the job history store before the job
starts to run
@@ -514,7 +506,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
TimingEvent jobRunTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_RUN);
// Start the job and wait for it to finish
runWorkUnitStream(workUnitStream);
-
jobRunTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));
+
jobRunTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,EventName.JOB_RUN));
this.eventSubmitter
.submit(CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.UPPER_CAMEL,
"JOB_" + jobState.getState()));
@@ -529,7 +521,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
this.jobContext.finalizeJobStateBeforeCommit();
this.jobContext.commit();
postProcessJobState(jobState);
-
jobCommitTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.JOB_COMMIT));
+
jobCommitTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.JOB_COMMIT));
} finally {
long endTime = System.currentTimeMillis();
jobState.setEndTime(endTime);
@@ -544,11 +536,11 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
try {
TimingEvent jobCleanupTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_CLEANUP);
cleanupStagingData(jobState);
-
jobCleanupTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.JOB_CLEANUP));
+
jobCleanupTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.JOB_CLEANUP));
// Write job execution info to the job history store upon job
termination
this.jobContext.storeJobExecutionInfo();
} finally {
-
launchJobTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.FULL_JOB_EXECUTION));
+
launchJobTimer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.FULL_JOB_EXECUTION));
if (isWorkUnitsEmpty) {
//If no WorkUnits are created, first send the JobCompleteTimer
event.
notifyListeners(this.jobContext, jobListener,
TimingEvent.LauncherTimings.JOB_COMPLETE, new JobListenerAction() {
@@ -1040,7 +1032,7 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
throw new JobException("Failed to execute all JobListeners", e);
} finally {
LOG.info("Submitting {}", timerEventName);
- timer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
+ timer.stop(this.multiEventMetadataGenerator.getMetadata(this.jobContext,
EventName.getEnumFromEventId(timerEventName)));
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
new file mode 100644
index 0000000..fd4f721
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiEventMetadataGenerator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Maps;
+
+import avro.shaded.com.google.common.collect.Lists;
+
+import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.runtime.JobContext;
+import org.apache.gobblin.util.ClassAliasResolver;
+
+
+/**
+ * A class to add metadata from multiple {@link EventMetadataGenerator}s.
+ * {@link EventMetadataGenerator}s are supposed to be provided by a comma
separated string.
+ * If multiple {@link EventMetadataGenerator}s add the same metadata, the one
that comes later will take precedence.
+ */
+public class MultiEventMetadataGenerator {
+ private final List<EventMetadataGenerator> eventMetadataGenerators =
Lists.newArrayList();
+
+ public MultiEventMetadataGenerator(List<String>
multiEventMetadataGeneratorList) {
+ for (String eventMetadatadataGeneratorClassName :
multiEventMetadataGeneratorList) {
+ try {
+ ClassAliasResolver<EventMetadataGenerator> aliasResolver = new
ClassAliasResolver<>(EventMetadataGenerator.class);
+
this.eventMetadataGenerators.add(aliasResolver.resolveClass(eventMetadatadataGeneratorClassName).newInstance());
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException("Could not construct EventMetadataGenerator
" + eventMetadatadataGeneratorClassName, e);
+ }
+ }
+ }
+
+ public Map<String, String> getMetadata(JobContext jobContext, EventName
eventName) {
+ Map<String, String> metadata = Maps.newHashMap();
+
+ for (EventMetadataGenerator eventMetadataGenerator :
eventMetadataGenerators) {
+ metadata.putAll(eventMetadataGenerator.getMetadata(jobContext,
eventName));
+ }
+
+ return metadata;
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
new file mode 100644
index 0000000..c1b71d3
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MultiEventMetadataGeneratorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime;
+
+import java.util.Map;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.metrics.event.EventName;
+import org.apache.gobblin.runtime.api.EventMetadataGenerator;
+import org.apache.gobblin.runtime.api.MultiEventMetadataGenerator;
+
+
+public class MultiEventMetadataGeneratorTest {
+
+ @Test
+ public void testInstantiate() {
+ JobContext jobContext = Mockito.mock(JobContext.class);
+ MultiEventMetadataGenerator multiEventMetadataGenerator = new
MultiEventMetadataGenerator(ImmutableList.of(
+
"org.apache.gobblin.runtime.MultiEventMetadataGeneratorTest$DummyEventMetadataGenerator",
+
"org.apache.gobblin.runtime.MultiEventMetadataGeneratorTest$DummyEventMetadataGenerator2"));
+
+ Map<String, String> metadata =
multiEventMetadataGenerator.getMetadata(jobContext,
EventName.getEnumFromEventId("JobCompleteTimer"));
+ Assert.assertEquals(metadata.size(), 3);
+ Assert.assertEquals(metadata.get("dummyKey11"), "dummyValue11");
+ Assert.assertEquals(metadata.get("dummyKey12"), "dummyValue22");
+ Assert.assertEquals(metadata.get("dummyKey21"), "dummyValue21");
+ }
+
+ public static class DummyEventMetadataGenerator implements
EventMetadataGenerator {
+
+ @Override
+ public Map<String, String> getMetadata(JobContext jobContext, EventName
eventName) {
+ return ImmutableMap.of("dummyKey11", "dummyValue11", "dummyKey12",
"dummyValue12");
+ }
+ }
+
+ public static class DummyEventMetadataGenerator2 implements
EventMetadataGenerator {
+
+ @Override
+ public Map<String, String> getMetadata(JobContext jobContext, EventName
eventName) {
+ return ImmutableMap.of("dummyKey21", "dummyValue21", "dummyKey12",
"dummyValue22");
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
index 893c102..0eb8650 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.util;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -28,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
@@ -36,6 +38,8 @@ import com.google.common.collect.ImmutableMap;
*/
public class PropertiesUtils {
+ private static final Splitter LIST_SPLITTER =
Splitter.on(",").trimResults().omitEmptyStrings();
+
/**
* Combine a variable number of {@link Properties} into a single {@link
Properties}.
*/
@@ -71,6 +75,27 @@ public class PropertiesUtils {
}
/**
+ * Get the value of a comma separated property as a {@link List} of strings.
+ *
+ * @param key property key
+ * @return value associated with the key as a {@link List} of strings
+ */
+ public static List<String> getPropAsList(Properties properties, String key) {
+ return LIST_SPLITTER.splitToList(properties.getProperty(key));
+ }
+
+ /**
+ * Get the value of a property as a list of strings, using the given default
value if the property is not set.
+ *
+ * @param key property key
+ * @param def default value
+ * @return value (the default value if the property is not set) associated
with the key as a list of strings
+ */
+ public static List<String> getPropAsList(Properties properties, String key,
String def) {
+ return LIST_SPLITTER.splitToList(properties.getProperty(key, def));
+ }
+
+ /**
* Extract all the keys that start with a <code>prefix</code> in {@link
Properties} to a new {@link Properties}
* instance.
*
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
index 87f6828..69c95c2 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/PropertiesUtilsTest.java
@@ -22,6 +22,7 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
public class PropertiesUtilsTest {
@@ -52,4 +53,14 @@ public class PropertiesUtilsTest {
Assert.assertTrue(!extractedPropertiesK3.containsKey("k1.kk1"));
Assert.assertTrue(!extractedPropertiesK3.containsKey("k2.kk"));
}
+
+ @Test
+ public void testGetStringList() {
+ Properties properties = new Properties();
+ properties.put("key", "1,2, 3");
+
+ // values as comma separated strings
+ Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key"),
ImmutableList.of("1", "2", "3"));
+ Assert.assertEquals(PropertiesUtils.getPropAsList(properties, "key2",
"default"), ImmutableList.of("default"));
+ }
}