arunpandianp commented on code in PR #37662:
URL: https://github.com/apache/beam/pull/37662#discussion_r2908757159


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,148 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Handler;
+import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
 import org.slf4j.MDC;
 
 /**
  * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception 
is represented using
  * {@link Throwable#printStackTrace()}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
 public class DataflowWorkerLoggingHandler extends Handler {
-  private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
-      BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
-  static {
-    BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new 
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "TRACE"
-    // onto "DEBUG" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, 
"INFO");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, 
"NOTICE");
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "WARN" onto
-    // "WARNING" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, 
"WARNING");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, 
"ERROR");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
 "CRITICAL");
-  }
-
   /**
    * Buffer size to use when writing logs. This matches <a
    * href="https://cloud.google.com/logging/quotas#log-limits";>Logging usage 
limits</a> to avoid
    * spreading the same log entry across multiple disk flushes.
    */
   private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
 
+  // Used as a side-channel for a Logger for which the configured non-direct 
logging level doesn't
+  // match the default
+  // logging level and which is using direct logging.
+  private static class DirectHintResourceBundle extends ResourceBundle {
+    private static final String LEVEL_KEY = "NonDirectLogLevel";
+    private final Level nonDirectLogLevel;
+
+    DirectHintResourceBundle(Level directLevel) {

Review Comment:
   ```suggestion
       DirectHintResourceBundle(Level nonDirectLogLevel) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -131,28 +225,328 @@ public DataflowWorkerLoggingHandler(String filename, 
long sizeLimit) throws IOEx
     createOutputStream();
   }
 
-  public synchronized void setLogMdc(boolean enabled) {
-    this.logCustomMdc = enabled;
+  public void setLogMdc(boolean enabled) {
+    logCustomMdc = enabled;
   }
 
-  @Override
-  public synchronized void publish(LogRecord record) {
-    DataflowExecutionState currrentDataflowState = null;
-    ExecutionState currrentState = 
ExecutionStateTracker.getCurrentExecutionState();
-    if (currrentState instanceof DataflowExecutionState) {
-      currrentDataflowState = (DataflowExecutionState) currrentState;
-    }
-    // It's okay to pass in the null state, publish() handles and tests this.
-    publish(currrentDataflowState, record);
+  private void initializeLabelMaps(PipelineOptions options) {
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    DataflowWorkerHarnessOptions harnessOptions = 
options.as(DataflowWorkerHarnessOptions.class);
+    @Nullable String jobId = harnessOptions.getJobId();
+    if (jobId == null || jobId.isEmpty()) {
+      jobId = GceMetadataUtil.fetchDataflowJobId();
+    }
+    jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+    @Nullable String jobName = dataflowOptions.getJobName();
+    if (jobName == null || jobName.isEmpty()) {
+      jobName = GceMetadataUtil.fetchDataflowJobName();
+    }
+    jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+    String region = dataflowOptions.getRegion();
+    if (region.isEmpty()) {
+      region = GceMetadataUtil.fetchDataflowRegion();
+    }
+    region = middleCrop(region, LABEL_MAX_LENGTH);
+
+    // Note that the id in the options is a string name, not the numeric VM id.
+    @Nullable String workerName = harnessOptions.getWorkerId();
+    if (workerName == null || workerName.isEmpty()) {
+      workerName = GceMetadataUtil.fetchDataflowWorkerName();
+    }
+    workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+    String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(), 
LABEL_MAX_LENGTH);
+
+    @Nullable String projectId = harnessOptions.getProject();
+    if (projectId == null || projectId.isEmpty()) {
+      projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+    }
+    projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+    ImmutableMap.Builder<String, String> defaultLabelsBuilder = new 
ImmutableMap.Builder<>();
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_type", 
"instance");
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_name", 
workerName);
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+    defaultLabels = defaultLabelsBuilder.buildOrThrow();
+
+    ImmutableMap.Builder<String, String> resourceLabelBuilder = new 
ImmutableMap.Builder<>();
+    resourceLabelBuilder.put("job_id", jobId);
+    resourceLabelBuilder.put("job_name", jobName);
+    resourceLabelBuilder.put("project_id", projectId);
+    resourceLabelBuilder.put("region", region);
+    // We add the step when constructing the resource as it can change.
+    defaultResourceLabels = resourceLabelBuilder.buildOrThrow();
+    steplessMonitoredResource =
+        MonitoredResource.newBuilder(RESOURCE_TYPE)
+            .setLabels(defaultResourceLabels)
+            .addLabel(STEP_RESOURCE_LABEL, "")
+            .build();
+  }
+
+  private static String middleCrop(String value, int maxSize) {
+    if (value.length() <= maxSize) {
+      return value;
+    }
+    if (maxSize < 3) {
+      return value.substring(0, maxSize);
+    }
+    int firstHalfSize = (maxSize - 2) / 2;
+    int secondHalfSize = (maxSize - 3) / 2;
+    return value.substring(0, firstHalfSize)
+        + "..."
+        + value.substring(value.length() - secondHalfSize);
   }
 
-  public synchronized void publish(DataflowExecutionState 
currentExecutionState, LogRecord record) {
-    if (!isLoggable(record)) {
+  private static Severity severityFor(Level level) {
+    if (level instanceof LoggingLevel) {
+      return ((LoggingLevel) level).getSeverity();
+    }
+    // Choose the severity based on Level range, rounding down.
+    // The assumption is that Level values below maintain same numeric value
+    int value = level.intValue();
+    if (value < Level.INFO.intValue()) {
+      // Includes Level.CONFIG, Level.FINE, Level.FINER, Level.FINEST
+      return Severity.DEBUG;
+    } else if (value < Level.WARNING.intValue()) {
+      return Severity.INFO;
+    } else if (value < Level.SEVERE.intValue()) {
+      return Severity.WARNING;
+    } else if (value < Level.OFF.intValue()) {
+      return Severity.ERROR;
+    }
+    // Level.OFF is special meaning not to log.
+    return Severity.NONE;
+  }
+
+  private void addLogField(
+      Struct.Builder builder, String field, @Nullable String value, int 
maxSize) {
+    if (value == null || value.isEmpty()) {
       return;
     }
+    builder.putFieldsBuilderIfAbsent(field).setStringValue(middleCrop(value, 
maxSize));
+  }
+
+  @VisibleForTesting
+  LogEntry constructDirectLogEntry(
+      LogRecord record, @Nullable DataflowExecutionState executionState) {
+    Struct.Builder payloadBuilder = Struct.newBuilder();
+    addLogField(payloadBuilder, "logger", record.getLoggerName(), 
FIELD_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "message", getFormatter().formatMessage(record), 
MESSAGE_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "exception", formatException(record.getThrown()), 
MESSAGE_MAX_LENGTH);
+    addLogField(payloadBuilder, "thread", 
String.valueOf(record.getThreadID()), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "stage", 
DataflowWorkerLoggingMDC.getStageName(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "worker", 
DataflowWorkerLoggingMDC.getWorkerId(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "work", DataflowWorkerLoggingMDC.getWorkId(), 
FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "job", DataflowWorkerLoggingMDC.getJobId(), 
FIELD_MAX_LENGTH);
+    if (logCustomMdc) {
+      @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
+      if (mdcMap != null && !mdcMap.isEmpty()) {
+        Struct.Builder customDataBuilder =
+            
payloadBuilder.putFieldsBuilderIfAbsent("custom_data").getStructValueBuilder();
+        mdcMap.entrySet().stream()
+            .sorted(Map.Entry.comparingByKey())
+            .forEach(
+                (entry) -> {
+                  if (entry.getKey() != null) {
+                    addLogField(
+                        customDataBuilder, entry.getKey(), entry.getValue(), 
FIELD_MAX_LENGTH);
+                  }
+                });
+      }
+    }
+
+    @Nullable String stepId = null;
+    if (executionState != null) {
+      @Nullable NameContext nameContext = executionState.getStepName();
+      if (nameContext != null) {
+        stepId = nameContext.userName();
+      }
+    }
+    addLogField(payloadBuilder, "step", stepId, FIELD_MAX_LENGTH);
+
+    LogEntry.Builder builder =
+        LogEntry.newBuilder(Payload.JsonPayload.of(payloadBuilder.build()))
+            .setTimestamp(Instant.ofEpochMilli(record.getMillis()))
+            .setSeverity(severityFor(record.getLevel()));
+
+    if (stepId != null) {
+      builder.setResource(
+          MonitoredResource.newBuilder(RESOURCE_TYPE)
+              .setLabels(defaultResourceLabels)
+              .addLabel(STEP_RESOURCE_LABEL, stepId)
+              .build());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Enables logging of records directly to Cloud Logging instead of logging 
to the disk. Should
+   * only be called once.
+   *
+   * @param pipelineOptions the pipelineOptions, used to configure buffers etc.
+   * @param defaultNonDirectLogLevel the default level at which logs should be 
logged to disk.
+   *     LogEntries that are below this level will be logged directly to Cloud 
Logging. The behavior
+   *     for a specific LogEntry can be overridden by attaching a 
ResourceBundle obtained from
+   *     resourceBundleForNonDirectLogLevelHint to it.
+   */
+  synchronized void enableDirectLogging(
+      PipelineOptions pipelineOptions,
+      Level defaultNonDirectLogLevel,
+      @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+    checkState(
+        directLogging == null && this.testDirectLogInterceptor == null,
+        "enableDirectLogging should only be called once on a 
DataflowWorkerLoggingHandler");
+    initializeLabelMaps(pipelineOptions);
+
+    DataflowWorkerLoggingOptions dfLoggingOptions =
+        pipelineOptions.as(DataflowWorkerLoggingOptions.class);
+    this.fallbackDirectErrorsToDisk =
+        
Boolean.TRUE.equals(dfLoggingOptions.getDirectLoggingFallbackToDiskOnErrors());
+    directThrottler.setCooldownDuration(
+        
Duration.ofSeconds(dfLoggingOptions.getDirectLoggingCooldownSeconds()));
+
+    if (testDirectLogInterceptor == null) {
+      // Override some of the default settings.
+      LoggingOptions cloudLoggingOptions =
+          LoggingOptions.newBuilder()
+              .setBatchingSettings(
+                  BatchingSettings.newBuilder()
+                      .setFlowControlSettings(
+                          FlowControlSettings.newBuilder()
+                              .setLimitExceededBehavior(
+                                  
FlowController.LimitExceededBehavior.ThrowException)
+                              .setMaxOutstandingRequestBytes(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit())
+                              .setMaxOutstandingElementCount(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit())
+                              .build())
+                      // These thresholds match the default settings in
+                      // LoggingServiceV2StubSettings.java for 
writeLogEntries.  We must re-set them
+                      // when we override the flow control settings because 
otherwise
+                      // BatchingSettings defaults to no batching.
+                      .setElementCountThreshold(
+                          Math.min(
+                              1000L,
+                              Math.max(
+                                  1L,
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit() / 2)))
+                      .setRequestByteThreshold(
+                          Math.min(
+                              1048576L,
+                              Math.max(
+                                  1, 
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit() / 2)))
+                      .setDelayThresholdDuration(Duration.ofMillis(50L))
+                      .build())
+              .build();
+      ArrayList<Logging.WriteOption> writeOptions = new ArrayList<>();
+      writeOptions.add(Logging.WriteOption.labels(defaultLabels));
+      writeOptions.add(Logging.WriteOption.logName(LOG_TYPE));
+      
writeOptions.add(Logging.WriteOption.resource(steplessMonitoredResource));
+      this.directWriteOptions = Iterables.toArray(writeOptions, 
Logging.WriteOption.class);
+
+      Logging directLogging = cloudLoggingOptions.getService();
+      directLogging.setFlushSeverity(Severity.NONE);
+      directLogging.setWriteSynchronicity(Synchronicity.ASYNC);
+
+      this.directLogging = directLogging;
+    } else {
+      this.testDirectLogInterceptor = testDirectLogInterceptor;
+    }
+    this.defaultNonDirectLogLevel = defaultNonDirectLogLevel;
+  }
+
+  private static @Nullable DataflowExecutionState 
getCurrentDataflowExecutionState() {
+    @Nullable ExecutionState currentState = 
ExecutionStateTracker.getCurrentExecutionState();
+    if (!(currentState instanceof DataflowExecutionState)) {
+      return null;
+    }
+    return (DataflowExecutionState) currentState;
+  }
+
+  @Override
+  public void publish(@Nullable LogRecord record) {
+    if (record == null) {
+      return;
+    }
+    publish(getCurrentDataflowExecutionState(), record);
+  }
+
+  public void publish(@Nullable DataflowExecutionState executionState, 
LogRecord record) {
+    boolean isDirectLog = isConfiguredDirectLog(record);
+    if (isDirectLog) {
+      if (directThrottler.shouldAttemptDirectLog()) {
+        try {
+          LogEntry logEntry = constructDirectLogEntry(record, executionState);
+          if (testDirectLogInterceptor != null) {
+            // The default labels are applied by write options generally bute 
we merge them in here

Review Comment:
   ```suggestion
               // The default labels are applied by write options generally but 
we merge them in here
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,148 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Handler;
+import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
 import org.slf4j.MDC;
 
 /**
  * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception 
is represented using
  * {@link Throwable#printStackTrace()}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
 public class DataflowWorkerLoggingHandler extends Handler {
-  private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
-      BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
-  static {
-    BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new 
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "TRACE"
-    // onto "DEBUG" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, 
"INFO");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, 
"NOTICE");
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "WARN" onto
-    // "WARNING" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, 
"WARNING");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, 
"ERROR");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
 "CRITICAL");
-  }
-
   /**
    * Buffer size to use when writing logs. This matches <a
    * href="https://cloud.google.com/logging/quotas#log-limits";>Logging usage 
limits</a> to avoid
    * spreading the same log entry across multiple disk flushes.
    */
   private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
 
+  // Used as a side-channel for a Logger for which the configured non-direct 
logging level doesn't
+  // match the default
+  // logging level and which is using direct logging.
+  private static class DirectHintResourceBundle extends ResourceBundle {
+    private static final String LEVEL_KEY = "NonDirectLogLevel";
+    private final Level nonDirectLogLevel;
+
+    DirectHintResourceBundle(Level directLevel) {
+      this.nonDirectLogLevel = directLevel;
+    }
+
+    @Override
+    public String getBaseBundleName() {
+      return "DataflowWorkerLoggingHandler";
+    }
+
+    @Override
+    protected Object handleGetObject(@Nonnull String s) {
+      if (LEVEL_KEY.equals(s)) {
+        return nonDirectLogLevel;
+      }
+      return new MissingResourceException(
+          "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+    }
+
+    @Override
+    public @Nonnull Enumeration<String> getKeys() {
+      return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+    }
+  }
+  // Since there are just a couple possible levels, we cache them.
+  private static final ConcurrentHashMap<Level, ResourceBundle> 
resourceBundles =
+      new ConcurrentHashMap<>();
+
+  @VisibleForTesting
+  static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level 
nonDirectLogLevel) {
+    return resourceBundles.computeIfAbsent(nonDirectLogLevel, 
DirectHintResourceBundle::new);
+  }
+
   /** If true, add SLF4J MDC to custom_data of the log message. */
+  @LazyInit private boolean logCustomMdc = false;
+
+  // All of the direct logging related fields are only initialized if 
enableDirectLogging is called.
+  //  Afterwards they
+  // are logically final.
+  @LazyInit private @Nullable Logging directLogging = null;
+  @LazyInit private boolean fallbackDirectErrorsToDisk = false;
+  @LazyInit private Level defaultNonDirectLogLevel = Level.ALL;
+  @LazyInit private Logging.WriteOption[] directWriteOptions = new 
Logging.WriteOption[0];
+  @LazyInit private ImmutableMap<String, String> defaultResourceLabels = 
ImmutableMap.of();
+
+  @LazyInit
+  private MonitoredResource steplessMonitoredResource =
+      MonitoredResource.newBuilder(RESOURCE_TYPE).build();
+
+  @LazyInit private ImmutableMap<String, String> defaultLabels = 
ImmutableMap.of();
+  @LazyInit private @Nullable Consumer<LogEntry> testDirectLogInterceptor;

Review Comment:
   These fields (except logCustomMdc) are set in `enableDirectLogging` 
enableDirectLogging is synchronized. The fields are accessed in  publish 
without synchronization. What makes the threads calling publish see the 
updated/consistent values?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,148 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Handler;
+import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
 import org.slf4j.MDC;
 
 /**
  * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception 
is represented using
  * {@link Throwable#printStackTrace()}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
 public class DataflowWorkerLoggingHandler extends Handler {
-  private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
-      BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
-  static {
-    BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new 
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "TRACE"
-    // onto "DEBUG" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, 
"INFO");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, 
"NOTICE");
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "WARN" onto
-    // "WARNING" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, 
"WARNING");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, 
"ERROR");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
 "CRITICAL");
-  }
-
   /**
    * Buffer size to use when writing logs. This matches <a
    * href="https://cloud.google.com/logging/quotas#log-limits";>Logging usage 
limits</a> to avoid
    * spreading the same log entry across multiple disk flushes.
    */
   private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
 
+  // Used as a side-channel for a Logger for which the configured non-direct 
logging level doesn't
+  // match the default
+  // logging level and which is using direct logging.
+  private static class DirectHintResourceBundle extends ResourceBundle {
+    private static final String LEVEL_KEY = "NonDirectLogLevel";
+    private final Level nonDirectLogLevel;
+
+    DirectHintResourceBundle(Level directLevel) {
+      this.nonDirectLogLevel = directLevel;
+    }
+
+    @Override
+    public String getBaseBundleName() {
+      return "DataflowWorkerLoggingHandler";
+    }
+
+    @Override
+    protected Object handleGetObject(@Nonnull String s) {
+      if (LEVEL_KEY.equals(s)) {
+        return nonDirectLogLevel;
+      }
+      return new MissingResourceException(
+          "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+    }
+
+    @Override
+    public @Nonnull Enumeration<String> getKeys() {
+      return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+    }
+  }
+  // Since there are just a couple possible levels, we cache them.
+  private static final ConcurrentHashMap<Level, ResourceBundle> 
resourceBundles =
+      new ConcurrentHashMap<>();
+
+  @VisibleForTesting
+  static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level 
nonDirectLogLevel) {
+    return resourceBundles.computeIfAbsent(nonDirectLogLevel, 
DirectHintResourceBundle::new);
+  }
+
   /** If true, add SLF4J MDC to custom_data of the log message. */
+  @LazyInit private boolean logCustomMdc = false;
+
+  // All of the direct logging related fields are only initialized if 
enableDirectLogging is called.
+  //  Afterwards they
+  // are logically final.

Review Comment:
   ```suggestion
     //  Afterwards they are logically final.
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,148 @@
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Handler;
+import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
 import org.slf4j.MDC;
 
 /**
  * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception 
is represented using
  * {@link Throwable#printStackTrace()}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
 public class DataflowWorkerLoggingHandler extends Handler {
-  private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
-      BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
-  static {
-    BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new 
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "TRACE"
-    // onto "DEBUG" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, 
"INFO");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, 
"NOTICE");
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "WARN" onto
-    // "WARNING" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, 
"WARNING");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, 
"ERROR");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
 "CRITICAL");
-  }
-
   /**
    * Buffer size to use when writing logs. This matches <a
    * href="https://cloud.google.com/logging/quotas#log-limits";>Logging usage 
limits</a> to avoid
    * spreading the same log entry across multiple disk flushes.
    */
   private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
 
+  // Used as a side-channel for a Logger for which the configured non-direct 
logging level doesn't
+  // match the default
+  // logging level and which is using direct logging.
+  private static class DirectHintResourceBundle extends ResourceBundle {
+    private static final String LEVEL_KEY = "NonDirectLogLevel";
+    private final Level nonDirectLogLevel;
+
+    DirectHintResourceBundle(Level directLevel) {
+      this.nonDirectLogLevel = directLevel;
+    }
+
+    @Override
+    public String getBaseBundleName() {
+      return "DataflowWorkerLoggingHandler";
+    }
+
+    @Override
+    protected Object handleGetObject(@Nonnull String s) {
+      if (LEVEL_KEY.equals(s)) {
+        return nonDirectLogLevel;
+      }
+      return new MissingResourceException(
+          "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+    }
+
+    @Override
+    public @Nonnull Enumeration<String> getKeys() {
+      return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+    }
+  }
+  // Since there are just a couple possible levels, we cache them.
+  private static final ConcurrentHashMap<Level, ResourceBundle> 
resourceBundles =
+      new ConcurrentHashMap<>();
+
+  @VisibleForTesting
+  static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level 
nonDirectLogLevel) {
+    return resourceBundles.computeIfAbsent(nonDirectLogLevel, 
DirectHintResourceBundle::new);
+  }
+
   /** If true, add SLF4J MDC to custom_data of the log message. */
+  @LazyInit private boolean logCustomMdc = false;
+
+  // All of the direct logging related fields are only initialized if 
enableDirectLogging is called.
+  //  Afterwards they
+  // are logically final.
+  @LazyInit private @Nullable Logging directLogging = null;
+  @LazyInit private boolean fallbackDirectErrorsToDisk = false;
+  @LazyInit private Level defaultNonDirectLogLevel = Level.ALL;

Review Comment:
   Can we group the directLogging related LazyInit fields into a single `Class 
DirectLoggingOptions` and have a single `@LazyInit DirectLoggingOptions` field. 
Grouping them will make it clear that the updates of the grouped fields show up 
consistently (all/nothing). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to