This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 67c610117 [GOBBLIN-1992] Propagate slf4j MDC Context in workflows and 
activities of the tempor… (#3868)
67c610117 is described below

commit 67c610117c2b757bb15714826e0b0edc3e6b26a5
Author: William Lo <[email protected]>
AuthorDate: Mon Jan 29 16:44:27 2024 -0500

    [GOBBLIN-1992] Propagate slf4j MDC Context in workflows and activities of 
the tempor… (#3868)
    
    Propagate slf4j MDC Context in workflows and activities of the temporal 
Gobblin workflows
---
 .../ddm/launcher/ProcessWorkUnitsJobLauncher.java  |  1 +
 .../gobblin/temporal/ddm/work/assistance/Help.java |  8 +++
 .../ddm/work/assistance/MDCContextPropagator.java  | 76 ++++++++++++++++++++++
 .../client/TemporalWorkflowClientFactory.java      |  6 +-
 4 files changed, 90 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
index 88838ca6a..14a4bacf4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ProcessWorkUnitsJobLauncher.java
@@ -83,6 +83,7 @@ public class ProcessWorkUnitsJobLauncher extends 
GobblinTemporalJobLauncher {
         int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(this.jobProps, 
GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
         wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
       }
+      Help.propagateGaaSFlowExecutionContext(Help.loadJobState(wuSpec, 
Help.loadFileSystem(wuSpec)));
       WorkflowOptions options = WorkflowOptions.newBuilder()
           .setTaskQueue(this.queueName)
           .setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, 
ConfigFactory.parseProperties(jobProps)))
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index f382dcec1..dd47f590f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -46,6 +46,7 @@ import 
org.apache.gobblin.temporal.ddm.work.styles.FileSystemApt;
 import org.apache.gobblin.temporal.ddm.work.styles.JobStateful;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.SerializationUtils;
+import org.slf4j.MDC;
 
 
 /** Various capabilities useful in implementing Distributed Data Movement 
(DDM) */
@@ -207,4 +208,11 @@ public class Help {
       log.info("JobState(numAccesses: {}) - {}", numAccesses, 
jobStateByPath.stats());
     }
   }
+
+  public static void propagateGaaSFlowExecutionContext(JobState jobState) {
+    // TODO: log4j2 has better syntax around conditional logging such that the 
key does not need to be included in the value
+    MDC.put(ConfigurationKeys.FLOW_NAME_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_NAME_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY, "<<NOT SET>>")));
+    MDC.put(ConfigurationKeys.FLOW_GROUP_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_GROUP_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY, "<<NOT SET>>")));
+    MDC.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
String.format("%s:%s",ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "<<NOT SET>>")));
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
new file mode 100644
index 000000000..9dfd55f10
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/MDCContextPropagator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work.assistance;
+
+import com.google.protobuf.ByteString;
+import io.temporal.api.common.v1.Payload;
+import io.temporal.common.context.ContextPropagator;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.MDC;
+
+
+/**
+ * Context propagator for MDC in Temporal GaaS flows, which is used to share 
context between workflows and activities.
+ * Keys added to MDC will show up in Temporal headers serialized as protobufs
+ * See documentation at 
https://docs.temporal.io/dev-guide/go/observability#context-propagators and
+ * 
https://javadoc.io/doc/io.temporal/temporal-sdk/1.12.0/io/temporal/common/context/ContextPropagator.html
+ */
+@Slf4j
+public class MDCContextPropagator implements ContextPropagator {
+
+  @Override
+  public String getName() {
+    return this.getClass().getName();
+  }
+
+  @Override
+  public Object getCurrentContext() {
+    return MDC.getCopyOfContextMap();
+  }
+
+  @Override
+  public void setCurrentContext(Object context) {
+    Map<String, String> contextMap = (Map<String, String>)context;
+    for (Map.Entry<String, String> entry : contextMap.entrySet()) {
+      MDC.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public Map<String, Payload> serializeContext(Object context) {
+    Map<String, String> contextMap = (Map<String, String>)context;
+    Map<String, Payload> serializedContext = new HashMap<>();
+    for (Map.Entry<String, String> entry : contextMap.entrySet()) {
+      Payload payload = 
Payload.newBuilder().setData(ByteString.copyFrom(entry.getValue().getBytes(StandardCharsets.UTF_8))).build();
+      serializedContext.put(entry.getKey(), payload);
+    }
+    return serializedContext;
+  }
+
+  @Override
+  public Object deserializeContext(Map<String, Payload> context) {
+    Map<String, String> contextMap = new HashMap<>();
+    for (Map.Entry<String, Payload> entry : context.entrySet()) {
+      contextMap.put(entry.getKey(), 
entry.getValue().getData().toString(StandardCharsets.UTF_8));
+    }
+    return contextMap;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
index a8873bbeb..030d4aae1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/client/TemporalWorkflowClientFactory.java
@@ -41,6 +41,8 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.temporal.ddm.work.assistance.MDCContextPropagator;
+
 
 public class TemporalWorkflowClientFactory {
     public static WorkflowServiceStubs createServiceInstance(String 
connectionUri) throws Exception {
@@ -107,7 +109,9 @@ public class TemporalWorkflowClientFactory {
     }
 
     public static WorkflowClient createClientInstance(WorkflowServiceStubs 
service, String namespace) {
-        WorkflowClientOptions options = 
WorkflowClientOptions.newBuilder().setNamespace(namespace).build();
+        WorkflowClientOptions options = 
WorkflowClientOptions.newBuilder().setNamespace(namespace)
+            .setContextPropagators(Collections.singletonList(new 
MDCContextPropagator()))
+            .build();
         return WorkflowClient.newInstance(service, options);
     }
 

Reply via email to