scwhittle commented on code in PR #37662:
URL: https://github.com/apache/beam/pull/37662#discussion_r2894914974


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -131,28 +222,315 @@ public DataflowWorkerLoggingHandler(String filename, 
long sizeLimit) throws IOEx
     createOutputStream();
   }
 
-  public synchronized void setLogMdc(boolean enabled) {
-    this.logCustomMdc = enabled;
+  public void setLogMdc(boolean enabled) {
+    logCustomMdc = enabled;
   }
 
-  @Override
-  public synchronized void publish(LogRecord record) {
-    DataflowExecutionState currrentDataflowState = null;
-    ExecutionState currrentState = 
ExecutionStateTracker.getCurrentExecutionState();
-    if (currrentState instanceof DataflowExecutionState) {
-      currrentDataflowState = (DataflowExecutionState) currrentState;
-    }
-    // It's okay to pass in the null state, publish() handles and tests this.
-    publish(currrentDataflowState, record);
+  private void initializeLabelMaps(PipelineOptions options) {
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    DataflowWorkerHarnessOptions harnessOptions = 
options.as(DataflowWorkerHarnessOptions.class);
+    @Nullable String jobId = harnessOptions.getJobId();
+    if (jobId == null || jobId.isEmpty()) {
+      jobId = GceMetadataUtil.fetchDataflowJobId();
+    }
+    jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+    @Nullable String jobName = dataflowOptions.getJobName();
+    if (jobName == null || jobName.isEmpty()) {
+      jobName = GceMetadataUtil.fetchDataflowJobName();
+    }
+    jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+    String region = dataflowOptions.getRegion();
+    if (region.isEmpty()) {
+      region = GceMetadataUtil.fetchDataflowRegion();
+    }
+    region = middleCrop(region, LABEL_MAX_LENGTH);
+
+    // Note that the id in the options is a string name, not the numeric VM id.
+    @Nullable String workerName = harnessOptions.getWorkerId();
+    if (workerName == null || workerName.isEmpty()) {
+      workerName = GceMetadataUtil.fetchDataflowWorkerName();
+    }
+    workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+    String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(), 
LABEL_MAX_LENGTH);
+
+    @Nullable String projectId = harnessOptions.getProject();
+    if (projectId == null || projectId.isEmpty()) {
+      projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+    }
+    projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+    ImmutableMap.Builder<String, String> defaultLabelsBuilder = new 
ImmutableMap.Builder<>();
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_type", 
"instance");
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_name", 
workerName);
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+    defaultLabels = defaultLabelsBuilder.buildOrThrow();
+
+    ImmutableMap.Builder<String, String> resourceLabelBuilder = new 
ImmutableMap.Builder<>();
+    resourceLabelBuilder.put("job_id", jobId);
+    resourceLabelBuilder.put("job_name", jobName);
+    resourceLabelBuilder.put("project_id", projectId);
+    resourceLabelBuilder.put("region", region);
+    // We add the step when constructing the resource as it can change.
+    defaultResourceLabels = resourceLabelBuilder.buildOrThrow();
+    steplessMonitoredResource =
+        MonitoredResource.newBuilder(RESOURCE_TYPE)
+            .setLabels(defaultResourceLabels)
+            .addLabel(STEP_RESOURCE_LABEL, "")
+            .build();
+  }
+
+  private static String middleCrop(String value, int maxSize) {
+    if (value.length() <= maxSize) {
+      return value;
+    }
+    if (maxSize < 3) {
+      return value.substring(0, maxSize);
+    }
+    int firstHalfSize = (maxSize - 2) / 2;
+    int secondHalfSize = (maxSize - 3) / 2;
+    return value.substring(0, firstHalfSize)
+        + "..."
+        + value.substring(value.length() - secondHalfSize);
   }
 
-  public synchronized void publish(DataflowExecutionState 
currentExecutionState, LogRecord record) {
-    if (!isLoggable(record)) {
+  private static Severity severityFor(Level level) {
+    if (level instanceof LoggingLevel) {
+      return ((LoggingLevel) level).getSeverity();
+    }
+    // Choose the severity based on Level range, rounding down.
+    // The assumption is that Level values below maintain same numeric value
+    int value = level.intValue();
+    if (value < Level.INFO.intValue()) {
+      // Includes Level.CONFIG, Level.FINE, Level.FINER, Level.FINEST
+      return Severity.DEBUG;
+    } else if (value < Level.WARNING.intValue()) {
+      return Severity.INFO;
+    } else if (value < Level.SEVERE.intValue()) {
+      return Severity.WARNING;
+    } else if (value < Level.OFF.intValue()) {
+      return Severity.ERROR;
+    }
+    // Level.OFF is special meaning not to log.
+    return Severity.NONE;
+  }
+
+  private void addLogField(
+      Struct.Builder builder, String field, @Nullable String value, int 
maxSize) {
+    if (value == null || value.isEmpty()) {
       return;
     }
+    builder.putFieldsBuilderIfAbsent(field).setStringValue(middleCrop(value, 
maxSize));
+  }
+
+  @VisibleForTesting
+  LogEntry constructDirectLogEntry(
+      LogRecord record, @Nullable DataflowExecutionState executionState) {
+    Struct.Builder payloadBuilder = Struct.newBuilder();
+    addLogField(payloadBuilder, "logger", record.getLoggerName(), 
FIELD_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "message", getFormatter().formatMessage(record), 
MESSAGE_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "exception", formatException(record.getThrown()), 
MESSAGE_MAX_LENGTH);
+    addLogField(payloadBuilder, "thread", 
String.valueOf(record.getThreadID()), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "stage", 
DataflowWorkerLoggingMDC.getStageName(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "worker", 
DataflowWorkerLoggingMDC.getWorkerId(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "work", DataflowWorkerLoggingMDC.getWorkId(), 
FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "job", DataflowWorkerLoggingMDC.getJobId(), 
FIELD_MAX_LENGTH);
+    if (logCustomMdc) {
+      @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
+      if (mdcMap != null && !mdcMap.isEmpty()) {
+        Struct.Builder customDataBuilder =
+            
payloadBuilder.putFieldsBuilderIfAbsent("custom_data").getStructValueBuilder();
+        mdcMap.entrySet().stream()
+            .sorted(Map.Entry.comparingByKey())
+            .forEach(
+                (entry) -> {
+                  if (entry.getKey() != null) {
+                    addLogField(
+                        customDataBuilder, entry.getKey(), entry.getValue(), 
FIELD_MAX_LENGTH);
+                  }
+                });
+      }
+    }
+
+    @Nullable String stepId = null;
+    if (executionState != null) {
+      @Nullable NameContext nameContext = executionState.getStepName();
+      if (nameContext != null) {
+        stepId = nameContext.userName();
+      }
+    }
+    addLogField(payloadBuilder, "step", stepId, FIELD_MAX_LENGTH);
+
+    LogEntry.Builder builder =
+        LogEntry.newBuilder(Payload.JsonPayload.of(payloadBuilder.build()))
+            .setTimestamp(Instant.ofEpochMilli(record.getMillis()))
+            .setSeverity(severityFor(record.getLevel()));
+
+    if (stepId != null) {
+      builder.setResource(
+          MonitoredResource.newBuilder(RESOURCE_TYPE)
+              .setLabels(defaultResourceLabels)
+              .addLabel(STEP_RESOURCE_LABEL, stepId)
+              .build());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Enables logging of records directly to Cloud Logging instead of logging 
to the disk. Should
+   * only be called once.
+   *
+   * @param pipelineOptions the pipelineOptions, used to configure buffers etc.
+   * @param defaultNonDirectLogLevel the default level at which logs should be 
logged to disk.
+   *     LogEntries that are below this level will be logged directly to Cloud 
Logging. The behavior
+   *     for a specific LogEntry can be overridden by attaching a 
ResourceBundle obtained from
+   *     resourceBundleForNonDirectLogLevelHint to it.
+   */
+  synchronized void enableDirectLogging(
+      PipelineOptions pipelineOptions,
+      Level defaultNonDirectLogLevel,
+      @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+    checkState(
+        directLogging == null && this.testDirectLogInterceptor == null,
+        "enableDirectLogging should only be called once on a 
DataflowWorkerLoggingHandler");
+    initializeLabelMaps(pipelineOptions);
+
+    DataflowWorkerLoggingOptions dfLoggingOptions =
+        pipelineOptions.as(DataflowWorkerLoggingOptions.class);
+    this.fallbackDirectErrorsToDisk =
+        
Boolean.TRUE.equals(dfLoggingOptions.getDirectLoggingFallbackToDiskOnErrors());
+    directThrottler.setCooldownDuration(
+        
Duration.ofSeconds(dfLoggingOptions.getDirectLoggingCooldownSeconds()));
+
+    if (testDirectLogInterceptor == null) {
+      // Override some of the default settings.
+      LoggingOptions cloudLoggingOptions =
+          LoggingOptions.newBuilder()
+              .setBatchingSettings(
+                  BatchingSettings.newBuilder()
+                      .setFlowControlSettings(
+                          FlowControlSettings.newBuilder()
+                              .setLimitExceededBehavior(
+                                  
FlowController.LimitExceededBehavior.ThrowException)
+                              .setMaxOutstandingRequestBytes(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit())
+                              .setMaxOutstandingElementCount(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit())
+                              .build())
+                      // These thresholds match the default settings in
+                      // LoggingServiceV2StubSettings.java for 
writeLogEntries.  We must re-set them
+                      // when we override the flow control settings because 
otherwise
+                      // BatchingSettings defaults to no batching.
+                      .setElementCountThreshold(
+                          Math.min(
+                              1000L,
+                              Math.max(
+                                  1L,
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit() / 2)))
+                      .setRequestByteThreshold(
+                          Math.min(
+                              1048576L,
+                              Math.max(
+                                  1, 
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit() / 2)))
+                      .setDelayThresholdDuration(Duration.ofMillis(50L))
+                      .build())
+              .build();
+      ArrayList<Logging.WriteOption> writeOptions = new ArrayList<>();
+      writeOptions.add(Logging.WriteOption.labels(defaultLabels));
+      writeOptions.add(Logging.WriteOption.logName(LOG_TYPE));
+      
writeOptions.add(Logging.WriteOption.resource(steplessMonitoredResource));
+      this.directWriteOptions = Iterables.toArray(writeOptions, 
Logging.WriteOption.class);
+
+      Logging directLogging = cloudLoggingOptions.getService();
+      directLogging.setFlushSeverity(Severity.NONE);
+      directLogging.setWriteSynchronicity(Synchronicity.ASYNC);
+
+      this.directLogging = directLogging;
+    } else {
+      this.testDirectLogInterceptor = testDirectLogInterceptor;
+    }
+    this.defaultNonDirectLogLevel = defaultNonDirectLogLevel;
+  }
+
+  private static @Nullable DataflowExecutionState 
getCurrentDataflowExecutionState() {
+    @Nullable ExecutionState currentState = 
ExecutionStateTracker.getCurrentExecutionState();
+    if (!(currentState instanceof DataflowExecutionState)) {
+      return null;
+    }
+    return (DataflowExecutionState) currentState;
+  }
+
+  @Override
+  public void publish(@Nullable LogRecord record) {
+    if (record == null) {
+      return;
+    }
+    publish(getCurrentDataflowExecutionState(), record);
+  }
+
+  public void publish(@Nullable DataflowExecutionState executionState, 
LogRecord record) {
+    boolean isDirectLog = isConfiguredDirectLog(record);
+    if (isDirectLog) {
+      if (directThrottler.shouldAttemptDirectLog()) {
+        try {
+          LogEntry logEntry = constructDirectLogEntry(record, executionState);
+          if (testDirectLogInterceptor != null) {
+            // The default labels are applied by write options generally bute 
we merge them in here
+            // so they are visible to the test.
+            HashMap<String, String> mergedLabels = new 
HashMap<>(defaultLabels);
+            mergedLabels.putAll(logEntry.getLabels());
+            logEntry = logEntry.toBuilder().setLabels(mergedLabels).build();
+            if (logEntry.getResource() == null) {
+              logEntry = 
logEntry.toBuilder().setResource(steplessMonitoredResource).build();
+            }
+            checkNotNull(testDirectLogInterceptor).accept(logEntry);
+          } else {
+            checkNotNull(directLogging).write(ImmutableList.of(logEntry), 
directWriteOptions);

Review Comment:
   reposting here since email reply didn't respond in thread:
   
   Yes i was using that at first but it has some downsides:
   - formats as string instead of payload
   - adds labels we don't have in existing logs
   - we need to modify the MonitoredResource based upon the log as it includes
   the step name and the only way to do that was to override the protected
   logentryfor method instead of using enhancers etc
   - we want to ensure we don't double log which is tricky if there are two
   separate handlers
   
   I was originally delegating to an instance of LoggingHandler but it
   actually wasn't saving much code and was doing extra things we didn't want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to