This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 67c610117 [GOBBLIN-1992] Propagate slf4j MDC Context in workflows and
activities of the tempor… (#3868)
67c610117 is described below
commit 67c610117c2b757bb15714826e0b0edc3e6b26a5
Author: William Lo <[email protected]>
AuthorDate: Mon Jan 29 16:44:27 2024 -0500
[GOBBLIN-1992] Propagate slf4j MDC Context in workflows and activities of
the tempor… (#3868)
Propagate slf4j MDC Context in workflows and activities of the temporal
Gobblin workflows
---
.../ddm/launcher/ProcessWorkUnitsJobLauncher.java | 1 +
.../gobblin/temporal/ddm/work/assistance/Help.java | 8 +++
.../ddm/work/assistance/MDCContextPropagator.java | 76 ++++++++++++++++++++++
.../client/TemporalWorkflowClientFactory.java | 6 +-
4 files changed, 90 insertions(+), 1 deletion(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 88838ca6a..14a4bacf4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -83,6 +83,7 @@ public class ProcessWorkUnitsJobLauncher extends
GobblinTemporalJobLauncher {
int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(this.jobProps,
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
}
+ Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec,
Help.loadFileSystem(wuSpec)));
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec,
ConfigFactory.parseProperties(jobProps)))
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index f382dcec1..dd47f590f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -46,6 +46,7 @@ import
org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.SerializationUtils;
+import org.slf4j.MDC;
/** Various capabilities useful in implementing Distributed Data Movement
(DDM) */
@@ -207,4 +208,11 @@ public class Help {
log.info("JobState(numAccesses: {}) - {}", numAccesses,
jobStateByPath.stats());
}
}
+
+ public static void propagateGaaSFlowExecutionContext(JobState jobState) {
+ // TODO: log4j2 has better syntax around conditional logging such that the
key does not need to be included in the value
+ MDC.put(ConfigurationKeys.FLOW_NAME_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY,
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>")));
+ MDC.put(ConfigurationKeys.FLOW_GROUP_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY,
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>")));
+ MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT SET>>")));
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
new file mode 100644
index 000000000..9dfd55f10
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.assistance;
+
+import com.google.protobuf.ByteString;
+import io.temporal.api.common.v1.Payload;
+import io.temporal.common.context.ContextPropagator;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.MDC;
+
+
+/**
+ * Context propagator for MDC in Temporal GaaS flows, which is used to share
context between workflows and activities.
+ * Keys added to MDC will show up in Temporal headers serialized as protobufs
+ * See documentation at
https://docs.temporal.io/dev-guide/go/observability#context-propagators and
+ *
https://javadoc.io/doc/io.temporal/temporal-sdk/1.12.0/io/temporal/common/context/ContextPropagator.html
+ */
+@Slf4j
+public class MDCContextPropagator implements ContextPropagator {
+
+ @Override
+ public String getName() {
+ return this.getClass().getName();
+ }
+
+ @Override
+ public Object getCurrentContext() {
+ return MDC.getCopyOfContextMap();
+ }
+
+ @Override
+ public void setCurrentContext(Object context) {
+ Map<String, String> contextMap = (Map<String, String>)context;
+ for (Map.Entry<String, String> entry : contextMap.entrySet()) {
+ MDC.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public Map<String, Payload> serializeContext(Object context) {
+ Map<String, String> contextMap = (Map<String, String>)context;
+ Map<String, Payload> serializedContext = new HashMap<>();
+ for (Map.Entry<String, String> entry : contextMap.entrySet()) {
+ Payload payload =
Payload.newBuilder().setData(ByteString.copyFrom(entry.getValue().getBytes(StandardCharsets.UTF_8))).build();
+ serializedContext.put(entry.getKey(), payload);
+ }
+ return serializedContext;
+ }
+
+ @Override
+ public Object deserializeContext(Map<String, Payload> context) {
+ Map<String, String> contextMap = new HashMap<>();
+ for (Map.Entry<String, Payload> entry : context.entrySet()) {
+ contextMap.put(entry.getKey(),
entry.getValue().getData().toString(StandardCharsets.UTF_8));
+ }
+ return contextMap;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index a8873bbeb..030d4aae1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -41,6 +41,8 @@ import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
+
public class TemporalWorkflowClientFactory {
public static WorkflowServiceStubs createServiceInstance(String
connectionUri) throws Exception {
@@ -107,7 +109,9 @@ public class TemporalWorkflowClientFactory {
}
public static WorkflowClient createClientInstance(WorkflowServiceStubs
service, String namespace) {
- WorkflowClientOptions options =
WorkflowClientOptions.newBuilder().setNamespace(namespace).build();
+ WorkflowClientOptions options =
WorkflowClientOptions.newBuilder().setNamespace(namespace)
+ .setContextPropagators(Collections.singletonList(new
MDCContextPropagator()))
+ .build();
return WorkflowClient.newInstance(service, options);
}