This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 57ab2b96113 [Dataflow Java Runner] Add support for sending logs 
directly to Cloud Logging (#37662)
57ab2b96113 is described below

commit 57ab2b961133ec47111ba9e3bbd16610901b84aa
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 19:42:12 2026 +0000

    [Dataflow Java Runner] Add support for sending logs directly to Cloud 
Logging (#37662)
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 runners/google-cloud-dataflow-java/build.gradle    |   2 +
 .../options/DataflowWorkerLoggingOptions.java      |  74 ++
 .../google-cloud-dataflow-java/worker/build.gradle |   7 +-
 .../logging/DataflowWorkerLoggingHandler.java      | 778 +++++++++++++++++----
 .../logging/DataflowWorkerLoggingInitializer.java  | 180 ++++-
 .../JulHandlerPrintStreamAdapterFactory.java       |  11 +-
 .../logging/DataflowWorkerLoggingHandlerTest.java  | 504 +++++++++++--
 .../DataflowWorkerLoggingInitializerTest.java      | 111 ++-
 .../JulHandlerPrintStreamAdapterFactoryTest.java   |   4 +-
 .../sdk/extensions/gcp/util/GceMetadataUtil.java   |  14 +-
 11 files changed, 1425 insertions(+), 262 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 31248c641e3..9382527fa36 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -758,6 +758,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_cloud_datastore_v1_proto_client      : 
"com.google.cloud.datastore:datastore-v1-proto-client:2.34.0",   // 
[bomupgrader] sets version
         google_cloud_firestore                      : 
"com.google.cloud:google-cloud-firestore", // 
google_cloud_platform_libraries_bom sets version
         google_cloud_kms                            : 
"com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom 
sets version
+        google_cloud_logging                        : 
"com.google.cloud:google-cloud-logging", // google_cloud_platform_libraries_bom 
sets version
         google_cloud_pubsub                         : 
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom 
sets version
         // [bomupgrader] the BOM version is set by 
scripts/tools/bomupgrader.py. If update manually, also update
         // libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
@@ -773,7 +774,6 @@ class BeamModulePlugin implements Plugin<Project> {
         google_http_client_apache_v2                : 
"com.google.http-client:google-http-client-apache-v2", // 
google_cloud_platform_libraries_bom sets version
         google_http_client_gson                     : 
"com.google.http-client:google-http-client-gson", // 
google_cloud_platform_libraries_bom sets version
         google_http_client_jackson                  : 
"com.google.http-client:google-http-client-jackson:1.29.2",
-        google_http_client_gson                     : 
"com.google.http-client:google-http-client-gson", // 
google_cloud_platform_libraries_bom sets version
         google_http_client_protobuf                 : 
"com.google.http-client:google-http-client-protobuf", // 
google_cloud_platform_libraries_bom sets version
         google_oauth_client                         : 
"com.google.oauth-client:google-oauth-client:$google_oauth_clients_version",
         google_oauth_client_java6                   : 
"com.google.oauth-client:google-oauth-client-java6:$google_oauth_clients_version",
diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index c064aeed002..5a90926a239 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -110,6 +110,8 @@ dependencies {
   implementation library.java.google_http_client
   implementation library.java.google_http_client_gson
   permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
+  implementation library.java.google_cloud_logging
+  permitUnusedDeclared library.java.google_cloud_logging // BEAM-11761
   implementation library.java.hamcrest
   implementation library.java.jackson_annotations
   implementation library.java.jackson_core
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
index 5322539b80e..ab412d6cfab 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -66,6 +66,38 @@ public interface DataflowWorkerLoggingOptions extends 
PipelineOptions {
 
   void setDefaultWorkerLogLevel(Level level);
 
+  /**
+   * Controls the log level for which messages are uploaded to Cloud Logging. 
If a message is
+   * configured to be sent to both directly to cloud logging and default 
disk-based logging it will
+   * just be sent to disk-based logging. This allows for configuration such as
+   * "--defaultWorkerLogLevel=WARN --defaultWorkerDirectLoggerLevel=INFO where 
INFO logs will be
+   * directly sent to cloud logging and WARN logs and higher will be sent to 
disk-based logging.
+   *
+   * <p>Note that this is just the default and may be overridden for specific 
classes with
+   * --workerDirectLogLevelOverrides.
+   */
+  @Description(
+      "Controls the default direct to Cloud Logging level of all logs without 
a log level override."
+          + "If a message is configured to be sent to both directly to cloud 
logging and default disk-based logging "
+          + "it will just be sent to disk-based logging.")
+  @Default.Enum("OFF")
+  Level getDefaultWorkerDirectLoggerLevel();
+
+  void setDefaultWorkerDirectLoggerLevel(Level level);
+
+  @Description(
+      "The maximum buffered bytes for records in the queue that are being sent 
directly to Cloud Logging.")
+  @Default.Long(100L * 1024 * 1024)
+  Long getWorkerDirectLoggerBufferByteLimit();
+
+  void setWorkerDirectLoggerBufferByteLimit(Long value);
+
+  @Description("The maximum buffered elements in the queue being sent directly 
to Cloud Logging.")
+  @Default.Long(1_000_000)
+  Long getWorkerDirectLoggerBufferElementLimit();
+
+  void setWorkerDirectLoggerBufferElementLimit(Long value);
+
   /**
    * Controls the log level given to messages printed to {@code System.out}.
    *
@@ -121,6 +153,48 @@ public interface DataflowWorkerLoggingOptions extends 
PipelineOptions {
 
   void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
 
+  /**
+   * This option controls the direct log levels for specifically named 
loggers. If a message is
+   * configured to be sent to both directly to cloud logging and default 
disk-based logging it will
+   * just be sent to disk-based logging. If an override only exists for a 
logger for direct logging,
+   * the --defaultWorkerLogLevel will be used for the non-direct configuration 
for the logger.
+   *
+   * <p>Later options with equivalent names override earlier options.
+   *
+   * <p>See {@link WorkerLogLevelOverrides} for more information on how to 
configure logging on a
+   * per {@link Class}, {@link Package}, or name basis. If used from the 
command line, the expected
+   * format is {"Name":"Level",...}, further details on {@link 
WorkerLogLevelOverrides#from}.
+   */
+  @Description(
+      "This option controls the direct log levels for specifically named 
loggers. "
+          + "The expected format is {\"Name\":\"Level\",...}. The Dataflow 
worker supports a logging "
+          + "hierarchy based off of names that are '.' separated. For example, 
by specifying the value "
+          + "{\"a.b.c.Foo\":\"DEBUG\"}, the logger for the class 'a.b.c.Foo' 
will be configured to "
+          + "output logs at the DEBUG level. Similarly, by specifying the 
value {\"a.b.c\":\"WARN\"}, "
+          + "all loggers underneath the 'a.b.c' package will be configured to 
output logs at the WARN "
+          + "level. System.out and System.err levels are configured via 
loggers of the corresponding "
+          + "name. Also, note that when multiple overrides are specified, the 
exact name followed by "
+          + "the closest parent takes precedence. Note that if an override is 
just provided for the direct log level "
+          + "for a logger, the default non-direct log level will be used for 
non-direct logs.")
+  WorkerLogLevelOverrides getWorkerDirectLogLevelOverrides();
+
+  void setWorkerDirectLogLevelOverrides(WorkerLogLevelOverrides value);
+
+  @Default.Boolean(true)
+  @Description(
+      "If true, when there are errors with sending logs directly to Cloud 
Logging, the logs will fallback to "
+          + "disk-based logging. If false, such logs will be dropped.")
+  Boolean getDirectLoggingFallbackToDiskOnErrors();
+
+  void setDirectLoggingFallbackToDiskOnErrors(Boolean value);
+
+  @Default.Integer(10)
+  @Description(
+      "If an error is encountered with sending logs directly to Cloud Logging, 
direct logging will not be attempted for this many seconds.")
+  Integer getDirectLoggingCooldownSeconds();
+
+  void setDirectLoggingCooldownSeconds(Integer value);
+
   /**
    * Defines a log level override for a specific class, package, or name.
    *
diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle 
b/runners/google-cloud-dataflow-java/worker/build.gradle
index 4068c5f88e4..610fa411459 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -151,7 +151,6 @@ applyJavaNature(
             exclude "META-INF/LICENSE.txt"
             exclude "about.html"
         })
-
 
/******************************************************************************/
 // Configure the worker root project
 
@@ -167,6 +166,11 @@ configurations {
 
 dependencies {
     implementation 
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+    implementation library.java.gax
+    implementation library.java.google_cloud_core
+    implementation library.java.guava
+    permitUnusedDeclared library.java.guava
+    implementation library.java.protobuf_java
 
     // Note that any dependency that is modified here should also be modified 
within
     // runners/google-cloud-dataflow-java/worker/build.gradle using the rules 
provided
@@ -193,6 +197,7 @@ dependencies {
 
     implementation library.java.google_auth_library_credentials
     implementation library.java.proto_google_common_protos
+    implementation library.java.google_cloud_logging
 
     // Conscrypt shouldn't be included here because Conscrypt won't work when 
being shaded.
     // (Context: https://github.com/apache/beam/pull/13846)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 14af9851080..e81c277c7d7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.runners.dataflow.worker.logging;
 
+import static 
org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.abbreviateMiddle;
 import static 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.LEVELS;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import com.fasterxml.jackson.core.JsonEncoding;
 import com.fasterxml.jackson.core.JsonFactory;
@@ -25,6 +28,22 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.cloud.MonitoredResource;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.logging.LogEntry;
+import com.google.cloud.logging.Logging;
+import com.google.cloud.logging.LoggingLevel;
+import com.google.cloud.logging.LoggingOptions;
+import com.google.cloud.logging.Payload;
+import com.google.cloud.logging.Severity;
+import com.google.cloud.logging.Synchronicity;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.errorprone.annotations.concurrent.LazyInit;
+import com.google.protobuf.Struct;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -33,22 +52,43 @@ import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Handler;
+import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import 
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
 import org.slf4j.MDC;
 
@@ -56,30 +96,9 @@ import org.slf4j.MDC;
  * Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception 
is represented using
  * {@link Throwable#printStackTrace()}.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
 public class DataflowWorkerLoggingHandler extends Handler {
-  private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
-      BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
-  static {
-    BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new 
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "TRACE"
-    // onto "DEBUG" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG, 
"DEBUG");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO, 
"INFO");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE, 
"NOTICE");
-    // Note that Google Cloud Logging only defines a fixed number of 
severities and maps "WARN" onto
-    // "WARNING" as seen here: 
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
-    // 
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN, 
"WARNING");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR, 
"ERROR");
-    
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
 "CRITICAL");
-  }
-
   /**
    * Buffer size to use when writing logs. This matches <a
    * href="https://cloud.google.com/logging/quotas#log-limits";>Logging usage 
limits</a> to avoid
@@ -87,9 +106,107 @@ public class DataflowWorkerLoggingHandler extends Handler {
    */
   private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
 
+  // Used as a side-channel for a Logger for which the configured non-direct 
logging level doesn't
+  // match the default
+  // logging level and which is using direct logging.
+  private static class DirectHintResourceBundle extends ResourceBundle {
+    private static final String LEVEL_KEY = "NonDirectLogLevel";
+    private final Level nonDirectLogLevel;
+
+    DirectHintResourceBundle(Level nonDirectLogLevel) {
+      this.nonDirectLogLevel = nonDirectLogLevel;
+    }
+
+    @Override
+    public String getBaseBundleName() {
+      return "DataflowWorkerLoggingHandler";
+    }
+
+    @Override
+    protected Object handleGetObject(@Nonnull String s) {
+      if (LEVEL_KEY.equals(s)) {
+        return nonDirectLogLevel;
+      }
+      return new MissingResourceException(
+          "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+    }
+
+    @Override
+    public @Nonnull Enumeration<String> getKeys() {
+      return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+    }
+  }
+  // Since there are just a couple possible levels, we cache them.
+  private static final ConcurrentHashMap<Level, ResourceBundle> 
resourceBundles =
+      new ConcurrentHashMap<>();
+
+  @VisibleForTesting
+  static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level 
nonDirectLogLevel) {
+    return resourceBundles.computeIfAbsent(nonDirectLogLevel, 
DirectHintResourceBundle::new);
+  }
+
   /** If true, add SLF4J MDC to custom_data of the log message. */
+  @LazyInit private boolean logCustomMdc = false;
+
+  // Only instantiated and set if enableDirectLogging is called.
+  private static class DirectLoggingState {
+    DirectLoggingState(
+        ImmutableMap<String, String> defaultLabels,
+        ImmutableMap<String, String> defaultResourceLabels,
+        MonitoredResource steplessMonitoredResource,
+        Level defaultNonDirectLogLevel,
+        boolean fallbackDirectErrorsToDisk,
+        @Nullable Logging directLogging,
+        Logging.WriteOption[] directWriteOptions,
+        @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+      checkState((directLogging == null) != (testDirectLogInterceptor == 
null));
+      this.defaultLabels = defaultLabels;
+      this.defaultResourceLabels = defaultResourceLabels;
+      this.steplessMonitoredResource = steplessMonitoredResource;
+      this.fallbackDirectErrorsToDisk = fallbackDirectErrorsToDisk;
+      this.defaultNonDirectLogLevel = defaultNonDirectLogLevel;
+      this.directLogging = directLogging;
+      this.directWriteOptions = directWriteOptions;
+      this.testDirectLogInterceptor =
+          testDirectLogInterceptor == null ? e -> {} : 
testDirectLogInterceptor;
+    }
+
+    final @Nullable Logging directLogging;
+    final boolean fallbackDirectErrorsToDisk;
+    final Level defaultNonDirectLogLevel;
+    final Logging.WriteOption[] directWriteOptions;
+    final ImmutableMap<String, String> defaultResourceLabels;
+    final MonitoredResource steplessMonitoredResource;
+    final ImmutableMap<String, String> defaultLabels;
+    final Consumer<LogEntry> testDirectLogInterceptor;
+  }
+
+  private final AtomicReference<DirectLoggingState> directLoggingState = new 
AtomicReference<>();
+
   @GuardedBy("this")
-  private boolean logCustomMdc = false;
+  Instant nextDirectFallbackLogInstant = Instant.EPOCH;
+
+  private static final String LOG_TYPE = "dataflow.googleapis.com%2Fworker";
+  private static final String RESOURCE_TYPE = "dataflow_step";
+  private static final String STEP_RESOURCE_LABEL = "step_id";
+  private static final int FIELD_MAX_LENGTH = 100;
+  private static final int LABEL_MAX_LENGTH = 100;
+  private static final int MESSAGE_MAX_LENGTH = 30_000;
+  // HACK: Detect stderr logs that originate from the LoggingImpl error 
handling. Hopefully this can
+  // be removed if
+  // error handling in LoggingImpl can be customized to use the ErrorHandler.
+  private static final String LOGGING_IMPL_STDERR_PREFIX = "ERROR: onFailure 
exception:";
+
+  // Null after close().
+  @GuardedBy("this")
+  private @Nullable JsonGenerator generator;
+
+  @GuardedBy("this")
+  private @Nullable CountingOutputStream counter;
+
+  private final long sizeLimit;
+  private final Supplier<OutputStream> outputStreamFactory;
+  private final JsonFactory generatorFactory;
 
   /**
    * Formats the throwable as per {@link Throwable#printStackTrace()}.
@@ -97,7 +214,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
    * @param thrown The throwable to format.
    * @return A string containing the contents of {@link 
Throwable#printStackTrace()}.
    */
-  public static String formatException(Throwable thrown) {
+  public static @Nullable String formatException(@Nullable Throwable thrown) {
     if (thrown == null) {
       return null;
     }
@@ -108,15 +225,14 @@ public class DataflowWorkerLoggingHandler extends Handler 
{
     return sw.toString();
   }
 
-  /** Constructs a handler that writes to a rotating set of files. */
-  public DataflowWorkerLoggingHandler(String filename, long sizeLimit) throws 
IOException {
-    this(new FileOutputStreamFactory(filename), sizeLimit);
-  }
-
   /**
    * Constructs a handler that writes to arbitrary output streams. No rollover 
if sizeLimit is zero
    * or negative.
    */
+  public DataflowWorkerLoggingHandler(String filename, long sizeLimit) throws 
IOException {
+    this(new FileOutputStreamFactory(filename), sizeLimit);
+  }
+
   DataflowWorkerLoggingHandler(Supplier<OutputStream> factory, long sizeLimit) 
throws IOException {
     this.setFormatter(new SimpleFormatter());
     this.outputStreamFactory = factory;
@@ -131,28 +247,335 @@ public class DataflowWorkerLoggingHandler extends 
Handler {
     createOutputStream();
   }
 
-  public synchronized void setLogMdc(boolean enabled) {
-    this.logCustomMdc = enabled;
+  public void setLogMdc(boolean enabled) {
+    logCustomMdc = enabled;
   }
 
-  @Override
-  public synchronized void publish(LogRecord record) {
-    DataflowExecutionState currrentDataflowState = null;
-    ExecutionState currrentState = 
ExecutionStateTracker.getCurrentExecutionState();
-    if (currrentState instanceof DataflowExecutionState) {
-      currrentDataflowState = (DataflowExecutionState) currrentState;
-    }
-    // It's okay to pass in the null state, publish() handles and tests this.
-    publish(currrentDataflowState, record);
+  private static Pair<ImmutableMap<String, String>, ImmutableMap<String, 
String>>
+      labelsFromOptionsAndMetadata(PipelineOptions options) {
+    DataflowPipelineOptions dataflowOptions = 
options.as(DataflowPipelineOptions.class);
+    DataflowWorkerHarnessOptions harnessOptions = 
options.as(DataflowWorkerHarnessOptions.class);
+    @Nullable String jobId = harnessOptions.getJobId();
+    if (jobId == null || jobId.isEmpty()) {
+      jobId = GceMetadataUtil.fetchDataflowJobId();
+    }
+    jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+    @Nullable String jobName = dataflowOptions.getJobName();
+    if (jobName == null || jobName.isEmpty()) {
+      jobName = GceMetadataUtil.fetchDataflowJobName();
+    }
+    jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+    String region = dataflowOptions.getRegion();
+    if (region.isEmpty()) {
+      region = GceMetadataUtil.fetchDataflowRegion();
+    }
+    region = middleCrop(region, LABEL_MAX_LENGTH);
+
+    // Note that the id in the options is a string name, not the numeric VM id.
+    @Nullable String workerName = harnessOptions.getWorkerId();
+    if (workerName == null || workerName.isEmpty()) {
+      workerName = GceMetadataUtil.fetchDataflowWorkerName();
+    }
+    workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+    String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(), 
LABEL_MAX_LENGTH);
+
+    @Nullable String projectId = harnessOptions.getProject();
+    if (projectId == null || projectId.isEmpty()) {
+      projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+    }
+    projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+    ImmutableMap.Builder<String, String> defaultLabelsBuilder = new 
ImmutableMap.Builder<>();
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_type", 
"instance");
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_name", 
workerName);
+    defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+    defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+
+    ImmutableMap.Builder<String, String> resourceLabelBuilder = new 
ImmutableMap.Builder<>();
+    resourceLabelBuilder.put("job_id", jobId);
+    resourceLabelBuilder.put("job_name", jobName);
+    resourceLabelBuilder.put("project_id", projectId);
+    resourceLabelBuilder.put("region", region);
+    // We add the step when constructing the resource as it can change.
+
+    return Pair.of(defaultLabelsBuilder.buildOrThrow(), 
resourceLabelBuilder.buildOrThrow());
+  }
+
+  private static String middleCrop(String value, int maxSize) {
+    return abbreviateMiddle(value, "...", maxSize);
+  }
+
+  private static Severity severityFor(Level level) {
+    if (level instanceof LoggingLevel) {
+      return ((LoggingLevel) level).getSeverity();
+    }
+    // Choose the severity based on Level range, rounding down.
+    // The assumption is that Level values below maintain same numeric value
+    int value = level.intValue();
+    if (value < Level.INFO.intValue()) {
+      // Includes Level.CONFIG, Level.FINE, Level.FINER, Level.FINEST
+      return Severity.DEBUG;
+    } else if (value < Level.WARNING.intValue()) {
+      return Severity.INFO;
+    } else if (value < Level.SEVERE.intValue()) {
+      return Severity.WARNING;
+    } else if (value < Level.OFF.intValue()) {
+      return Severity.ERROR;
+    }
+    // Level.OFF is special meaning not to log.
+    return Severity.NONE;
   }
 
-  public synchronized void publish(DataflowExecutionState 
currentExecutionState, LogRecord record) {
-    if (!isLoggable(record)) {
+  private static void addLogField(
+      Struct.Builder builder, String field, @Nullable String value, int 
maxSize) {
+    if (value == null || value.isEmpty()) {
       return;
     }
+    builder.putFieldsBuilderIfAbsent(field).setStringValue(middleCrop(value, 
maxSize));
+  }
+
+  @VisibleForTesting
+  LogEntry constructDirectLogEntry(
+      LogRecord record,
+      @Nullable DataflowExecutionState executionState,
+      ImmutableMap<String, String> defaultResourceLabels) {
+    Struct.Builder payloadBuilder = Struct.newBuilder();
+    addLogField(payloadBuilder, "logger", record.getLoggerName(), 
FIELD_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "message", getFormatter().formatMessage(record), 
MESSAGE_MAX_LENGTH);
+    addLogField(
+        payloadBuilder, "exception", formatException(record.getThrown()), 
MESSAGE_MAX_LENGTH);
+    addLogField(payloadBuilder, "thread", 
String.valueOf(record.getThreadID()), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "stage", 
DataflowWorkerLoggingMDC.getStageName(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "worker", 
DataflowWorkerLoggingMDC.getWorkerId(), FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "work", DataflowWorkerLoggingMDC.getWorkId(), 
FIELD_MAX_LENGTH);
+    addLogField(payloadBuilder, "job", DataflowWorkerLoggingMDC.getJobId(), 
FIELD_MAX_LENGTH);
+    if (logCustomMdc) {
+      @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
+      if (mdcMap != null && !mdcMap.isEmpty()) {
+        Struct.Builder customDataBuilder =
+            
payloadBuilder.putFieldsBuilderIfAbsent("custom_data").getStructValueBuilder();
+        mdcMap.entrySet().stream()
+            .sorted(Map.Entry.comparingByKey())
+            .forEach(
+                (entry) -> {
+                  if (entry.getKey() != null) {
+                    addLogField(
+                        customDataBuilder, entry.getKey(), entry.getValue(), 
FIELD_MAX_LENGTH);
+                  }
+                });
+      }
+    }
+
+    @Nullable String stepId = null;
+    if (executionState != null) {
+      @Nullable NameContext nameContext = executionState.getStepName();
+      if (nameContext != null) {
+        stepId = nameContext.userName();
+      }
+    }
+    addLogField(payloadBuilder, "step", stepId, FIELD_MAX_LENGTH);
+
+    LogEntry.Builder builder =
+        LogEntry.newBuilder(Payload.JsonPayload.of(payloadBuilder.build()))
+            .setTimestamp(Instant.ofEpochMilli(record.getMillis()))
+            .setSeverity(severityFor(record.getLevel()));
+
+    if (stepId != null) {
+      builder.setResource(
+          MonitoredResource.newBuilder(RESOURCE_TYPE)
+              .setLabels(defaultResourceLabels)
+              .addLabel(STEP_RESOURCE_LABEL, stepId)
+              .build());
+    }
+
+    return builder.build();
+  }
+
+  /**
+   * Enables logging of records directly to Cloud Logging instead of logging 
to the disk. Should
+   * only be called once.
+   *
+   * @param pipelineOptions the pipelineOptions, used to configure buffers etc.
+   * @param defaultNonDirectLogLevel the default level at which logs should be 
logged to disk.
+   *     LogEntries that are below this level will be logged directly to Cloud 
Logging. The behavior
+   *     for a specific LogEntry can be overridden by attaching a 
ResourceBundle obtained from
+   *     resourceBundleForNonDirectLogLevelHint to it.
+   */
+  synchronized void enableDirectLogging(
+      PipelineOptions pipelineOptions,
+      Level defaultNonDirectLogLevel,
+      @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+    checkState(
+        directLoggingState.get() == null,
+        "enableDirectLogging should only be called once on a 
DataflowWorkerLoggingHandler");
+    Pair<ImmutableMap<String, String>, ImmutableMap<String, String>> labels =
+        labelsFromOptionsAndMetadata(pipelineOptions);
+    ImmutableMap<String, String> defaultLabels = labels.getLeft();
+    ImmutableMap<String, String> defaultResourceLabels = labels.getRight();
+    MonitoredResource steplessMonitoredResource =
+        MonitoredResource.newBuilder(RESOURCE_TYPE)
+            .setLabels(defaultResourceLabels)
+            .addLabel(STEP_RESOURCE_LABEL, "")
+            .build();
+
+    DataflowWorkerLoggingOptions dfLoggingOptions =
+        pipelineOptions.as(DataflowWorkerLoggingOptions.class);
+    directThrottler.setCooldownDuration(
+        
Duration.ofSeconds(dfLoggingOptions.getDirectLoggingCooldownSeconds()));
+
+    @Nullable Logging logging = null;
+    ArrayList<Logging.WriteOption> writeOptions = new ArrayList<>();
+    if (testDirectLogInterceptor == null) {
+      // Override some of the default settings.
+      LoggingOptions cloudLoggingOptions =
+          LoggingOptions.newBuilder()
+              .setBatchingSettings(
+                  BatchingSettings.newBuilder()
+                      .setFlowControlSettings(
+                          FlowControlSettings.newBuilder()
+                              .setLimitExceededBehavior(
+                                  
FlowController.LimitExceededBehavior.ThrowException)
+                              .setMaxOutstandingRequestBytes(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit())
+                              .setMaxOutstandingElementCount(
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit())
+                              .build())
+                      // These thresholds match the default settings in
+                      // LoggingServiceV2StubSettings.java for 
writeLogEntries.  We must re-set them
+                      // when we override the flow control settings because 
otherwise
+                      // BatchingSettings defaults to no batching.
+                      .setElementCountThreshold(
+                          Math.min(
+                              1000L,
+                              Math.max(
+                                  1L,
+                                  
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit() / 2)))
+                      .setRequestByteThreshold(
+                          Math.min(
+                              1048576L,
+                              Math.max(
+                                  1, 
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit() / 2)))
+                      .setDelayThresholdDuration(Duration.ofMillis(50L))
+                      .build())
+              .build();
+
+      writeOptions.add(Logging.WriteOption.labels(defaultLabels));
+      writeOptions.add(Logging.WriteOption.logName(LOG_TYPE));
+      
writeOptions.add(Logging.WriteOption.resource(steplessMonitoredResource));
+
+      logging = cloudLoggingOptions.getService();
+      logging.setFlushSeverity(Severity.NONE);
+      logging.setWriteSynchronicity(Synchronicity.ASYNC);
+    }
+
+    checkState(
+        directLoggingState.getAndSet(
+                new DirectLoggingState(
+                    defaultLabels,
+                    defaultResourceLabels,
+                    steplessMonitoredResource,
+                    defaultNonDirectLogLevel,
+                    
Boolean.TRUE.equals(dfLoggingOptions.getDirectLoggingFallbackToDiskOnErrors()),
+                    logging,
+                    Iterables.toArray(writeOptions, Logging.WriteOption.class),
+                    testDirectLogInterceptor))
+            == null,
+        "enableDirectLogging should only be called once on a 
DataflowWorkerLoggingHandler");
+  }
+
+  private static @Nullable DataflowExecutionState 
getCurrentDataflowExecutionState() {
+    @Nullable ExecutionState currentState = 
ExecutionStateTracker.getCurrentExecutionState();
+    if (!(currentState instanceof DataflowExecutionState)) {
+      return null;
+    }
+    return (DataflowExecutionState) currentState;
+  }
+
+  @Override
+  public void publish(@Nullable LogRecord record) {
+    if (record == null) {
+      return;
+    }
+    publish(getCurrentDataflowExecutionState(), record);
+  }
+
+  public void publish(@Nullable DataflowExecutionState executionState, 
LogRecord record) {
+    @Nullable DirectLoggingState direct = directLoggingState.get();
+    if (direct != null && isConfiguredDirectLog(record, 
direct.defaultNonDirectLogLevel)) {
+      if (directThrottler.shouldAttemptDirectLog()) {
+        try {
+          LogEntry logEntry =
+              constructDirectLogEntry(record, executionState, 
direct.defaultResourceLabels);
+          if (direct.directLogging != null) {
+            checkNotNull(direct.directLogging)
+                .write(ImmutableList.of(logEntry), direct.directWriteOptions);
+          } else {
+            // This is the testing path when testDirectLogInterceptor was 
specified.
+            // The default labels are applied by write options generally but 
we merge them in here
+            // so they are visible to the test.
+            HashMap<String, String> mergedLabels = new 
HashMap<>(direct.defaultLabels);
+            mergedLabels.putAll(logEntry.getLabels());
+            logEntry = logEntry.toBuilder().setLabels(mergedLabels).build();
+            if (logEntry.getResource() == null) {
+              logEntry = 
logEntry.toBuilder().setResource(direct.steplessMonitoredResource).build();
+            }
+            checkNotNull(direct.testDirectLogInterceptor).accept(logEntry);
+          }
+          directThrottler.noteDirectLoggingEnqueueSuccess();
+          return;
+        } catch (RuntimeException e) {
+          @Nullable String log = 
directThrottler.noteDirectLoggingEnqueueFailure();
+          if (log != null) {
+            printErrorToDisk(log, e);
+          }
+        }
+      }
+
+      // Either we were throttled or encountered an error enqueuing the log.
+      if (!direct.fallbackDirectErrorsToDisk) {
+        boolean shouldLog = false;
+        Instant now = Instant.now();
+        synchronized (this) {
+          if (now.isAfter(nextDirectFallbackLogInstant)) {
+            nextDirectFallbackLogInstant = now.plusSeconds(300);
+            shouldLog = true;
+          }
+        }
+        if (shouldLog) {
+          printErrorToDisk(
+              "Unable to buffer log to send directly to cloud logging and 
fallback is disabled, dropping log records.",
+              null);
+        }
+        return;
+      }
+    }
+    publishToDisk(executionState, record);
+  }
+
+  private void printErrorToDisk(String errorMsg, @Nullable Exception ex) {
+    LogRecord record = new LogRecord(Level.SEVERE, errorMsg);
+    record.setLoggerName(DataflowWorkerLoggingHandler.class.getName());
+    if (ex != null) {
+      record.setThrown(ex);
+    }
+    publishToDisk(null, record);
+  }
+
+  @SuppressWarnings({"nullness", "GuardedBy"})
+  public synchronized void publishToDisk(
+      @Nullable DataflowExecutionState currentExecutionState, LogRecord 
record) {
 
     rolloverOutputStreamIfNeeded();
 
+    if (generator == null) {
+      throw new IllegalStateException("rollover should have made generator");
+    }
     try {
       // Generating a JSON map like:
       // {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ...  
"message": "hello"}
@@ -168,21 +591,21 @@ public class DataflowWorkerLoggingHandler extends Handler 
{
           "severity",
           MoreObjects.firstNonNull(LEVELS.get(record.getLevel()), 
record.getLevel().getName()));
       // Write the other labels.
-      writeIfNotEmpty("message", getFormatter().formatMessage(record));
-      writeIfNotEmpty("thread", String.valueOf(record.getThreadID()));
-      writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
-      writeIfNotEmpty("stage", DataflowWorkerLoggingMDC.getStageName());
+      writeIfNotEmpty(generator, "message", 
getFormatter().formatMessage(record));
+      writeIfNotEmpty(generator, "thread", 
String.valueOf(record.getThreadID()));
+      writeIfNotEmpty(generator, "job", DataflowWorkerLoggingMDC.getJobId());
+      writeIfNotEmpty(generator, "stage", 
DataflowWorkerLoggingMDC.getStageName());
 
       if (currentExecutionState != null) {
         NameContext nameContext = currentExecutionState.getStepName();
         if (nameContext != null) {
-          writeIfNotEmpty("step", nameContext.userName());
+          writeIfNotEmpty(generator, "step", nameContext.userName());
         }
       }
-      writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId());
-      writeIfNotEmpty("work", DataflowWorkerLoggingMDC.getWorkId());
-      writeIfNotEmpty("logger", record.getLoggerName());
-      writeIfNotEmpty("exception", formatException(record.getThrown()));
+      writeIfNotEmpty(generator, "worker", 
DataflowWorkerLoggingMDC.getWorkerId());
+      writeIfNotEmpty(generator, "work", DataflowWorkerLoggingMDC.getWorkId());
+      writeIfNotEmpty(generator, "logger", record.getLoggerName());
+      writeIfNotEmpty(generator, "exception", 
formatException(record.getThrown()));
       if (logCustomMdc) {
         @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
         if (mdcMap != null && !mdcMap.isEmpty()) {
@@ -212,78 +635,131 @@ public class DataflowWorkerLoggingHandler extends 
Handler {
     // entries will be the inverse of the flush latency. That could be as 
little as one hundred
     // log entries per second on some systems. For higher throughput this 
should be changed to
     // batch publish operations while writes and flushes are in flight on a 
different thread.
-    flush();
+    flushDisk();
   }
 
-  public synchronized void publish(BeamFnApi.LogEntry logEntry) {
-    if (generator == null || logEntry == null) {
-      return;
+  @VisibleForTesting
+  static boolean isConfiguredDirectLog(LogRecord record, Level 
defaultNonDirectLogLevel) {
+    @Nullable ResourceBundle resourceBundle = record.getResourceBundle();
+    Level nonDirectLogLevel =
+        (resourceBundle instanceof DirectHintResourceBundle)
+            ? ((DirectHintResourceBundle) resourceBundle).nonDirectLogLevel
+            : defaultNonDirectLogLevel;
+    return nonDirectLogLevel.intValue() > record.getLevel().intValue();
+  }
+
+  @VisibleForTesting
+  boolean testVerifyIsConfiguredDirectLog(LogRecord record) {
+    return isConfiguredDirectLog(
+        record, 
checkNotNull(directLoggingState).get().defaultNonDirectLogLevel);
+  }
+
+  @VisibleForTesting
+  static final class DirectLoggingThrottler {
+    private final AtomicBoolean rejectingRequests = new AtomicBoolean(false);
+    private final Supplier<Instant> nowSupplier;
+
+    DirectLoggingThrottler(Supplier<Instant> nowSupplier) {
+      this.nowSupplier = nowSupplier;
     }
 
-    rolloverOutputStreamIfNeeded();
+    @GuardedBy("this")
+    private Duration cooldownDuration = Duration.ZERO;
 
-    try {
-      // Generating a JSON map like:
-      // {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ...  
"message": "hello"}
-      generator.writeStartObject();
-      // Write the timestamp.
-      generator.writeFieldName("timestamp");
-      generator.writeStartObject();
-      generator.writeNumberField("seconds", 
logEntry.getTimestamp().getSeconds());
-      generator.writeNumberField("nanos", logEntry.getTimestamp().getNanos());
-      generator.writeEndObject();
-      // Write the severity.
-      generator.writeObjectField(
-          "severity",
-          MoreObjects.firstNonNull(
-              BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.get(logEntry.getSeverity()),
-              logEntry.getSeverity().getValueDescriptor().getName()));
-      // Write the other labels.
-      writeIfNotEmpty("message", logEntry.getMessage());
-      writeIfNotEmpty("thread", logEntry.getThread());
-      writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
-      // TODO: Write the stage execution information by translating the 
currently execution
-      // instruction id to a stage.
-      // writeIfNotNull("stage", ...);
-      writeIfNotEmpty("step", logEntry.getTransformId());
-      writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId());
-      // Id should match to id in 
//depot/google3/third_party/cloud/dataflow/worker/agent/sdk.go
-      writeIfNotEmpty("portability_worker_id", 
DataflowWorkerLoggingMDC.getSdkHarnessId());
-      writeIfNotEmpty("work", logEntry.getInstructionId());
-      writeIfNotEmpty("logger", logEntry.getLogLocation());
-      // TODO: Figure out a way to get exceptions transported across Beam Fn 
Logging API
-      writeIfNotEmpty("exception", logEntry.getTrace());
-      generator.writeEndObject();
-      generator.writeRaw(System.lineSeparator());
-    } catch (IOException | RuntimeException e) {
-      reportFailure("Unable to publish", e, ErrorManager.WRITE_FAILURE);
+    @GuardedBy("this")
+    private Instant rejectRequestsUntil = Instant.MIN;
+
+    @GuardedBy("this")
+    private Instant failureStartTime = Instant.MAX;
+
+    @GuardedBy("this")
+    private Instant nextLogTime = Instant.MIN;
+
+    @GuardedBy("this")
+    private Instant nextSendErrorLogTime = Instant.MIN;
+
+    synchronized void setCooldownDuration(Duration cooldownDuration) {
+      this.cooldownDuration = cooldownDuration;
     }
 
-    // This implementation is based on that of java.util.logging.FileHandler, 
which flushes in a
-    // synchronized context like this. Unfortunately the maximum throughput 
for generating log
-    // entries will be the inverse of the flush latency. That could be as 
little as one hundred
-    // log entries per second on some systems. For higher throughput this 
should be changed to
-    // batch publish operations while writes and flushes are in flight on a 
different thread.
-    flush();
+    boolean shouldAttemptDirectLog() {
+      if (!rejectingRequests.get()) {
+        return true;
+      }
+
+      Instant now = nowSupplier.get();
+      synchronized (this) {
+        return !now.isBefore(rejectRequestsUntil);
+      }
+    }
+
+    void noteDirectLoggingEnqueueSuccess() {
+      if (!rejectingRequests.get()) {
+        return;
+      }
+
+      synchronized (this) {
+        rejectingRequests.set(false);
+        failureStartTime = Instant.MAX;
+      }
+    }
+
+    synchronized @Nullable String noteDirectLoggingEnqueueFailure() {
+      rejectingRequests.set(true);
+      if (failureStartTime.equals(Instant.MAX)) {
+        Instant now = nowSupplier.get();
+        failureStartTime = now;
+        rejectRequestsUntil = now.plus(cooldownDuration);
+        if (nextLogTime.isBefore(now)) {
+          nextLogTime = now.plusSeconds(60);
+          return String.format(
+              "Failed to buffer log to send directly to cloud logging, falling 
back to normal logging path for %s. Consider increasing the buffer size with 
--workerDirectLoggerBufferByteLimit and 
--workerDirectLoggerBufferElementLimit.",
+              cooldownDuration);
+        }
+      }
+      return null;
+    }
+
+    synchronized @Nullable String noteDirectLoggingError() {
+      rejectingRequests.set(true);
+      Instant now = nowSupplier.get();
+      if (failureStartTime.equals(Instant.MAX)) {
+        failureStartTime = now;
+      }
+      rejectRequestsUntil = now.plus(cooldownDuration);
+      if (nextSendErrorLogTime.isBefore(now)) {
+        nextSendErrorLogTime = now.plusSeconds(60);
+        return String.format(
+            "Error occurred while handling direct logs, falling back to normal 
logging path for %s.",
+            cooldownDuration);
+      }
+      return null;
+    }
   }
 
-  /**
-   * Check if a LogRecord will be logged.
-   *
-   * <p>This method checks if the <tt>LogRecord</tt> has an appropriate level 
and whether it
-   * satisfies any <tt>Filter</tt>. It will also return false if the handler 
has been closed, or the
-   * LogRecord is null.
-   */
-  @Override
-  public boolean isLoggable(LogRecord record) {
-    return generator != null && record != null && super.isLoggable(record);
+  private final DirectLoggingThrottler directThrottler = new 
DirectLoggingThrottler(Instant::now);
+
+  void publishStdErr(LogRecord record) {
+    @Nullable String msg = record.getMessage();
+    if (msg != null && msg.startsWith(LOGGING_IMPL_STDERR_PREFIX)) {
+      @Nullable String throttleMsg = directThrottler.noteDirectLoggingError();
+      if (throttleMsg != null) {
+        printErrorToDisk(throttleMsg, null);
+      }
+    }
+    publishToDisk(getCurrentDataflowExecutionState(), record);
   }
 
-  @Override
-  public synchronized void flush() {
+  void publishStdOut(LogRecord record) {
+    publishToDisk(getCurrentDataflowExecutionState(), record);
+  }
+
+  private void flushDisk() {
     try {
-      if (generator != null) {
-        generator.flush();
+      synchronized (this) {
+        if (generator != null) {
+          generator.flush();
+        }
       }
     } catch (IOException | RuntimeException e) {
       reportFailure("Unable to flush", e, ErrorManager.FLUSH_FAILURE);
@@ -291,20 +767,39 @@ public class DataflowWorkerLoggingHandler extends Handler 
{
   }
 
   @Override
-  public synchronized void close() {
-    // Flush any in-flight content, though there should not actually be any 
because
-    // the generator is currently flushed in the synchronized publish() method.
-    flush();
-    // Close the generator and log file.
+  public void flush() {
+    flushDisk();
+    @Nullable DirectLoggingState direct = directLoggingState.get();
+    if (direct != null && direct.directLogging != null) {
+      direct.directLogging.flush();
+    }
+  }
+
+  @Override
+  public void close() {
+    synchronized (this) {
+      // Flush any in-flight content, though there should not actually be any 
because
+      // the generator is currently flushed in the synchronized publish() 
method.
+      flush();
+      // Close the generator and log file.
+      try {
+        if (generator != null) {
+          generator.close();
+        }
+      } catch (IOException | RuntimeException e) {
+        reportFailure("Unable to close", e, ErrorManager.CLOSE_FAILURE);
+      } finally {
+        generator = null;
+        counter = null;
+      }
+    }
     try {
-      if (generator != null) {
-        generator.close();
+      @Nullable DirectLoggingState direct = directLoggingState.get();
+      if (direct != null && direct.directLogging != null) {
+        direct.directLogging.close();
       }
-    } catch (IOException | RuntimeException e) {
+    } catch (Exception e) {
       reportFailure("Unable to close", e, ErrorManager.CLOSE_FAILURE);
-    } finally {
-      generator = null;
-      counter = null;
     }
   }
 
@@ -330,7 +825,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
     }
   }
 
-  private void createOutputStream() throws IOException {
+  private synchronized void createOutputStream() throws IOException {
     CountingOutputStream stream = new 
CountingOutputStream(outputStreamFactory.get());
     generator = generatorFactory.createGenerator(stream, JsonEncoding.UTF8);
     counter = stream;
@@ -343,20 +838,24 @@ public class DataflowWorkerLoggingHandler extends Handler 
{
    * Rollover to a new output stream (log file) if we have reached the size 
limit. Ensure that the
    * rollover fails or succeeds atomically.
    */
-  private void rolloverOutputStreamIfNeeded() {
+  private synchronized void rolloverOutputStreamIfNeeded() {
+    if (counter == null) {
+      throw new IllegalStateException("");
+    }
     if (counter.getCount() < sizeLimit) {
       return;
     }
 
     try {
-      JsonGenerator old = generator;
+      @Nullable JsonGenerator old = generator;
       createOutputStream();
-
-      try {
-        // Rollover successful. Attempt to close old stream, but ignore on 
failure.
-        old.close();
-      } catch (IOException | RuntimeException e) {
-        reportFailure("Unable to close old log file", e, 
ErrorManager.CLOSE_FAILURE);
+      if (old != null) {
+        try {
+          // Rollover successful. Attempt to close old stream, but ignore on 
failure.
+          old.close();
+        } catch (IOException | RuntimeException e) {
+          reportFailure("Unable to close old log file", e, 
ErrorManager.CLOSE_FAILURE);
+        }
       }
     } catch (IOException | RuntimeException e) {
       reportFailure("Unable to create new log file", e, 
ErrorManager.OPEN_FAILURE);
@@ -364,7 +863,8 @@ public class DataflowWorkerLoggingHandler extends Handler {
   }
 
   /** Appends a JSON key/value pair if the specified val is not null. */
-  private void writeIfNotEmpty(String name, String val) throws IOException {
+  private static void writeIfNotEmpty(JsonGenerator generator, String name, 
String val)
+      throws IOException {
     if (val != null && !val.isEmpty()) {
       generator.writeStringField(name, val);
     }
@@ -381,12 +881,4 @@ public class DataflowWorkerLoggingHandler extends Handler {
       // Failed to report logging failure. No meaningful action left.
     }
   }
-
-  // Null after close().
-  private JsonGenerator generator;
-  private CountingOutputStream counter;
-
-  private final long sizeLimit;
-  private final Supplier<OutputStream> outputStreamFactory;
-  private final JsonFactory generatorFactory;
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
index a56c62e9231..0f1b5c2750b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
@@ -24,16 +24,19 @@ import static 
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOpti
 import static 
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.TRACE;
 import static 
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN;
 
+import com.google.cloud.logging.LogEntry;
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.charset.Charset;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.function.Consumer;
 import java.util.logging.ErrorManager;
 import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.Logger;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import org.apache.beam.sdk.options.SdkHarnessOptions;
 import org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel;
@@ -77,8 +80,8 @@ public class DataflowWorkerLoggingInitializer {
 
   private static final String FILESIZE_MB_PROPERTY = 
"dataflow.worker.logging.filesize_mb";
 
-  private static final String SYSTEM_OUT_LOG_NAME = "System.out";
-  private static final String SYSTEM_ERR_LOG_NAME = "System.err";
+  static final String SYSTEM_OUT_LOG_NAME = "System.out";
+  static final String SYSTEM_ERR_LOG_NAME = "System.err";
 
   static final ImmutableBiMap<Level, DataflowWorkerLoggingOptions.Level> 
LEVELS =
       ImmutableBiMap.<Level, DataflowWorkerLoggingOptions.Level>builder()
@@ -108,6 +111,7 @@ public class DataflowWorkerLoggingInitializer {
   private static PrintStream originalStdOut;
   private static PrintStream originalStdErr = System.err;
   private static boolean initialized = false;
+  @VisibleForTesting static @Nullable Consumer<LogEntry> 
testDirectLoggingInterceptor = null;
 
   // This is the same as ErrorManager except that it uses the provided
   // print stream.
@@ -174,10 +178,13 @@ public class DataflowWorkerLoggingInitializer {
       originalStdErr = System.err;
       System.setOut(
           JulHandlerPrintStreamAdapterFactory.create(
-              loggingHandler, SYSTEM_OUT_LOG_NAME, Level.INFO, 
Charset.defaultCharset()));
+              loggingHandler::publish, SYSTEM_OUT_LOG_NAME, Level.INFO, 
Charset.defaultCharset()));
       System.setErr(
           JulHandlerPrintStreamAdapterFactory.create(
-              loggingHandler, SYSTEM_ERR_LOG_NAME, Level.SEVERE, 
Charset.defaultCharset()));
+              loggingHandler::publish,
+              SYSTEM_ERR_LOG_NAME,
+              Level.SEVERE,
+              Charset.defaultCharset()));
 
       // Initialize the SDK Logging Handler, which will only be used for the 
LoggingService
       sdkLoggingHandler = makeLoggingHandler(SDK_FILEPATH_PROPERTY, 
DEFAULT_SDK_LOGGING_LOCATION);
@@ -188,6 +195,65 @@ public class DataflowWorkerLoggingInitializer {
     }
   }
 
+  private static class LevelOverrides {
+    LevelOverrides(Level disk, Level direct) {
+      this.direct = direct;
+      this.disk = disk;
+    }
+
+    final Level disk;
+    final Level direct;
+
+    LevelOverrides merge(@Nullable Level disk, @Nullable Level direct) {
+      return new LevelOverrides(
+          disk == null ? this.disk : disk, direct == null ? this.direct : 
direct);
+    }
+  }
+
+  private static void applyOverridesToLogger(
+      Logger logger,
+      LevelOverrides overrides,
+      Level defaultNonDirectLogLevel,
+      boolean directLoggingEnabled) {
+    if (!directLoggingEnabled) {
+      logger.setLevel(overrides.disk);
+      if (overrides.disk.intValue() != Level.OFF.intValue()) {
+        LOG.warn(
+            "Ignoring the disk logging level override for {} because 
--defaultWorkerDirectLoggerLevel was OFF.",
+            logger.getName());
+      }
+      return;
+    }
+
+    // Configure the logger to accept logs of the lower value. The log will be 
sent directly based
+    // upon the default
+    // nonDirectLogLevel or the provided hint.
+    if (overrides.direct.intValue() < overrides.disk.intValue()) {
+      logger.setLevel(overrides.direct);
+      if (overrides.disk.intValue() != defaultNonDirectLogLevel.intValue()) {
+        logger.setResourceBundle(
+            
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(overrides.disk));
+      }
+    } else {
+      if (overrides.direct.intValue() != Level.OFF.intValue()) {
+        LOG.warn(
+            "Ignoring the direct logging level for {} as it is greater or 
equal to the disk logging level, {} vs {}",
+            overrides.direct,
+            overrides.disk,
+            logger.getName());
+      }
+      logger.setLevel(overrides.disk);
+      if (overrides.disk.intValue() < defaultNonDirectLogLevel.intValue()) {
+        // We need to provide the hint as otherwise the default would be used 
and send the log
+        // directly which is not
+        // desired. We don't bother if the threshold was increased as the 
right decision would still
+        // be made.
+        logger.setResourceBundle(
+            
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(overrides.disk));
+      }
+    }
+  }
+
   /** Reconfigures logging with the passed in options. */
   public static synchronized void configure(DataflowWorkerLoggingOptions 
options) {
     if (!initialized) {
@@ -200,38 +266,105 @@ public class DataflowWorkerLoggingInitializer {
     SdkHarnessOptions harnessOptions = options.as(SdkHarnessOptions.class);
     boolean usedDeprecated = false;
 
-    // default value for both DefaultSdkHarnessLogLevel and 
DefaultWorkerLogLevel are INFO
-    Level overrideLevel = 
getJulLevel(harnessOptions.getDefaultSdkHarnessLogLevel());
+    // default value for both DefaultSdkHarnessLogLevel and 
DefaultWorkerLogLevel are INFO for disk
+    // logging and OFF for direct logging.
+    LevelOverrides defaultOverrides = new LevelOverrides(Level.INFO, 
Level.OFF);
     if (options.getDefaultWorkerLogLevel() != null && 
options.getDefaultWorkerLogLevel() != INFO) {
-      overrideLevel = getJulLevel(options.getDefaultWorkerLogLevel());
+      defaultOverrides =
+          
defaultOverrides.merge(getJulLevel(options.getDefaultWorkerLogLevel()), null);
       usedDeprecated = true;
+    } else {
+      defaultOverrides =
+          
defaultOverrides.merge(getJulLevel(harnessOptions.getDefaultSdkHarnessLogLevel()),
 null);
+    }
+    boolean directLoggingEnabled = false;
+    if (options.getDefaultWorkerDirectLoggerLevel() != null
+        && options.getDefaultWorkerDirectLoggerLevel() != OFF) {
+      Level diskLevel = defaultOverrides.disk;
+      Level directLevel = 
getJulLevel(options.getDefaultWorkerDirectLoggerLevel());
+      LOG.info(
+          "Enabling sending logs directly to Cloud Logging between severity {} 
and severity {}.",
+          directLevel,
+          diskLevel);
+      try {
+        loggingHandler.enableDirectLogging(options, diskLevel, 
testDirectLoggingInterceptor);
+        LOG.info("Configuring sending to cloud logging directly succeeded.");
+        defaultOverrides = defaultOverrides.merge(null, directLevel);
+        directLoggingEnabled = true;
+      } catch (RuntimeException e) {
+        LOG.error("Unable to configure logs to be sent directly to cloud 
logging", e);
+      }
     }
-    
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).setLevel(overrideLevel);
 
+    final LevelOverrides finalDefaultOverrides = defaultOverrides;
+    HashMap<String, LevelOverrides> loggerLevelOverrides = new HashMap<>();
     if (options.getWorkerLogLevelOverrides() != null) {
-      for (Map.Entry<String, DataflowWorkerLoggingOptions.Level> 
loggerOverride :
-          options.getWorkerLogLevelOverrides().entrySet()) {
-        Logger logger = Logger.getLogger(loggerOverride.getKey());
-        logger.setLevel(getJulLevel(loggerOverride.getValue()));
-        configuredLoggers.add(logger);
-      }
+      options
+          .getWorkerLogLevelOverrides()
+          .forEach(
+              (k, v) -> {
+                loggerLevelOverrides.compute(
+                    k,
+                    (lk, lv) -> {
+                      if (lv == null) {
+                        lv = finalDefaultOverrides;
+                      }
+                      return lv.merge(getJulLevel(v), null);
+                    });
+              });
       usedDeprecated = true;
     } else if (harnessOptions.getSdkHarnessLogLevelOverrides() != null) {
-      for (Map.Entry<String, SdkHarnessOptions.LogLevel> loggerOverride :
-          harnessOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
-        Logger logger = Logger.getLogger(loggerOverride.getKey());
-        logger.setLevel(getJulLevel(loggerOverride.getValue()));
-        configuredLoggers.add(logger);
-      }
+      harnessOptions
+          .getSdkHarnessLogLevelOverrides()
+          .forEach(
+              (k, v) -> {
+                loggerLevelOverrides.compute(
+                    k,
+                    (lk, lv) -> {
+                      if (lv == null) {
+                        lv = finalDefaultOverrides;
+                      }
+                      return lv.merge(getJulLevel(v), null);
+                    });
+              });
+    }
+    if (options.getWorkerDirectLogLevelOverrides() != null) {
+      options
+          .getWorkerDirectLogLevelOverrides()
+          .forEach(
+              (k, v) -> {
+                loggerLevelOverrides.compute(
+                    k,
+                    (lk, lv) -> {
+                      if (lv == null) {
+                        lv = finalDefaultOverrides;
+                      }
+                      return lv.merge(null, getJulLevel(v));
+                    });
+              });
     }
 
+    applyOverridesToLogger(
+        LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME),
+        defaultOverrides,
+        finalDefaultOverrides.disk,
+        directLoggingEnabled);
+    boolean finalDirectLoggingEnabled = directLoggingEnabled;
+    loggerLevelOverrides.forEach(
+        (loggerName, levelOverrides) -> {
+          Logger logger = Logger.getLogger(loggerName);
+          applyOverridesToLogger(
+              logger, levelOverrides, finalDefaultOverrides.disk, 
finalDirectLoggingEnabled);
+          configuredLoggers.add(logger);
+        });
+
     // If the options specify a level for messages logged to System.out/err, 
we need to reconfigure
     // the corresponding stream adapter.
     if (options.getWorkerSystemOutMessageLevel() != null) {
       System.out.close();
       System.setOut(
           JulHandlerPrintStreamAdapterFactory.create(
-              loggingHandler,
+              loggingHandler::publishStdOut,
               SYSTEM_OUT_LOG_NAME,
               getJulLevel(options.getWorkerSystemOutMessageLevel()),
               Charset.defaultCharset()));
@@ -241,13 +374,14 @@ public class DataflowWorkerLoggingInitializer {
       System.err.close();
       System.setErr(
           JulHandlerPrintStreamAdapterFactory.create(
-              loggingHandler,
+              loggingHandler::publishStdErr,
               SYSTEM_ERR_LOG_NAME,
               getJulLevel(options.getWorkerSystemErrMessageLevel()),
               Charset.defaultCharset()));
     }
 
     if (harnessOptions.getLogMdc()) {
+      LOG.info("Due to logMdc option, MDC fields will be added to custom_data 
field of logs.");
       loggingHandler.setLogMdc(true);
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 7e4f3a2cb18..92b27927f59 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -31,6 +31,7 @@ import java.nio.charset.CodingErrorAction;
 import java.util.Formatter;
 import java.util.Locale;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.logging.Handler;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
@@ -62,7 +63,7 @@ class JulHandlerPrintStreamAdapterFactory {
     /** Hold reference of named logger to check configured {@link Level}. */
     private final Logger logger;
 
-    private final Handler handler;
+    private final Consumer<LogRecord> handler;
     private final String loggerName;
     private final Level messageLevel;
 
@@ -79,7 +80,7 @@ class JulHandlerPrintStreamAdapterFactory {
     private ByteArrayOutputStream carryOverBytes;
 
     private JulHandlerPrintStream(
-        Handler handler, String loggerName, Level logLevel, Charset charset)
+        Consumer<LogRecord> handler, String loggerName, Level logLevel, 
Charset charset)
         throws UnsupportedEncodingException {
       super(
           new OutputStream() {
@@ -406,11 +407,11 @@ class JulHandlerPrintStreamAdapterFactory {
         if (OUTPUT_WARNING.compareAndSet(false, true)) {
           LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);
           log.setLoggerName(loggerName);
-          handler.publish(log);
+          handler.accept(log);
         }
         LogRecord log = new LogRecord(messageLevel, message);
         log.setLoggerName(loggerName);
-        handler.publish(log);
+        handler.accept(log);
       }
     }
   }
@@ -420,7 +421,7 @@ class JulHandlerPrintStreamAdapterFactory {
    * specified {@code loggerName} and {@code level}.
    */
   static PrintStream create(
-      Handler handler, String loggerName, Level messageLevel, Charset charset) 
{
+      Consumer<LogRecord> handler, String loggerName, Level messageLevel, 
Charset charset) {
     try {
       return new JulHandlerPrintStream(handler, loggerName, messageLevel, 
charset);
     } catch (UnsupportedEncodingException exc) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
index 3191228687c..0f4752de169 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
@@ -19,26 +19,44 @@ package org.apache.beam.runners.dataflow.worker.logging;
 
 import static 
org.apache.beam.runners.dataflow.worker.NameContextsForTests.nameContextForTest;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.logging.LogEntry;
+import com.google.cloud.logging.Payload;
+import com.google.cloud.logging.Severity;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.logging.Formatter;
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
 import java.util.logging.SimpleFormatter;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
 import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
 import 
org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState;
+import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler.DirectLoggingThrottler;
 import 
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Timestamp;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -88,48 +106,60 @@ public class DataflowWorkerLoggingHandlerTest {
 
   /** Encodes a LogRecord into a Json string. */
   private static String createJson(LogRecord record) throws IOException {
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
-    DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
-    // Format the record as JSON.
-    handler.publish(record);
-    // Decode the binary output as UTF-8 and return the generated string.
-    return new String(output.toByteArray(), StandardCharsets.UTF_8);
+    return createJson(record, null, null);
   }
 
-  private static String createJson(LogRecord record, Formatter formatter) 
throws IOException {
+  private static String createJson(
+      LogRecord record, @Nullable Formatter formatter, @Nullable Boolean 
enableMdc)
+      throws IOException {
     ByteArrayOutputStream output = new ByteArrayOutputStream();
     FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
     DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
-    handler.setFormatter(formatter);
+    if (formatter != null) {
+      handler.setFormatter(formatter);
+    }
+    if (enableMdc != null) {
+      handler.setLogMdc(enableMdc);
+    }
     // Format the record as JSON.
     handler.publish(record);
     // Decode the binary output as UTF-8 and return the generated string.
     return new String(output.toByteArray(), StandardCharsets.UTF_8);
   }
 
-  private static String createJsonWithCustomMdc(LogRecord record) throws 
IOException {
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
-    DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
-    handler.setLogMdc(true);
-    // Format the record as JSON.
-    handler.publish(record);
-    // Decode the binary output as UTF-8 and return the generated string.
-    return new String(output.toByteArray(), StandardCharsets.UTF_8);
+  private static LogEntry createLogEntry(LogRecord record) throws IOException {
+    return createLogEntry(record, null, null);
   }
 
-  /**
-   * Encodes a {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry} 
into a Json string.
-   */
-  private static String createJson(BeamFnApi.LogEntry record) throws 
IOException {
-    ByteArrayOutputStream output = new ByteArrayOutputStream();
-    FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
+  private static PipelineOptions pipelineOptionsForTest() {
+    PipelineOptionsFactory.register(DataflowWorkerHarnessOptions.class);
+    return PipelineOptionsFactory.fromArgs(
+            "--jobId=testJobId",
+            "--jobName=testJobName",
+            "--region=testRegion",
+            "--project=testProject",
+            "--workerId=testWorkerName",
+            "--directLoggingCooldownSeconds=10000")
+        .create();
+  }
+
+  private static LogEntry createLogEntry(
+      LogRecord record, @Nullable Formatter formatter, @Nullable Boolean 
enableMdc)
+      throws IOException {
+    ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+    FixedOutputStreamFactory factory = new 
FixedOutputStreamFactory(fileOutput);
     DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
-    // Format the record as JSON.
-    handler.publish(record);
-    // Decode the binary output as UTF-8 and return the generated string.
-    return new String(output.toByteArray(), StandardCharsets.UTF_8);
+    if (formatter != null) {
+      handler.setFormatter(formatter);
+    }
+    if (enableMdc != null) {
+      handler.setLogMdc(enableMdc);
+    }
+    handler.enableDirectLogging(pipelineOptionsForTest(), Level.SEVERE, (e) -> 
{});
+    return handler.constructDirectLogEntry(
+        record,
+        (DataflowExecutionState) 
ExecutionStateTracker.getCurrentExecutionState(),
+        ImmutableMap.of());
   }
 
   private final ExecutionStateTracker tracker = 
ExecutionStateTracker.newForTest();
@@ -164,7 +194,7 @@ public class DataflowWorkerLoggingHandlerTest {
         new DataflowWorkerLoggingHandler(factory, expected.length() + 1 /* 
sizelimit */);
 
     // Using |expected|+1 for size limit means that we will rollover after 
writing 2 log messages.
-    // We thus expect to see 2 messsages written to 'first' and 1 message to 
'second',
+    // We thus expect to see 2 messages written to 'first' and 1 message to 
'second',
 
     handler.publish(record);
     handler.publish(record);
@@ -201,7 +231,6 @@ public class DataflowWorkerLoggingHandlerTest {
     DataflowWorkerLoggingMDC.setWorkerId(testWorkerId);
     DataflowWorkerLoggingMDC.setWorkId(testWorkId);
 
-    createLogRecord("test.message", null /* throwable */);
     assertEquals(
         String.format(
             
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
@@ -250,7 +279,7 @@ public class DataflowWorkerLoggingHandlerTest {
               + 
"\"message\":\"testMdcValue:test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
               + 
"\"worker\":\"testWorkerId\",\"work\":\"testWorkId\",\"logger\":\"LoggerName\"}"
               + System.lineSeparator(),
-          createJson(createLogRecord("test.message", null /* throwable */), 
customFormatter));
+          createJson(createLogRecord("test.message", null /* throwable */), 
customFormatter, null));
     }
   }
 
@@ -316,7 +345,7 @@ public class DataflowWorkerLoggingHandlerTest {
         
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
             + 
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
             + System.lineSeparator(),
-        createJsonWithCustomMdc(createLogRecord("test.message", null)));
+        createJson(createLogRecord(), null, true));
   }
 
   @Test
@@ -327,7 +356,7 @@ public class DataflowWorkerLoggingHandlerTest {
           
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
               + 
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
               + System.lineSeparator(),
-          createJson(createLogRecord("test.message", null)));
+          createJson(createLogRecord()));
     }
   }
 
@@ -340,7 +369,7 @@ public class DataflowWorkerLoggingHandlerTest {
               + 
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\","
               + "\"custom_data\":{\"key1\":\"cool 
value\",\"key2\":\"another\"}}"
               + System.lineSeparator(),
-          createJsonWithCustomMdc(createLogRecord("test.message", null)));
+          createJson(createLogRecord(), null, true));
     }
   }
 
@@ -358,33 +387,380 @@ public class DataflowWorkerLoggingHandlerTest {
         createJson(createLogRecord(null /* message */, null /* throwable */)));
   }
 
+  // Test the direct logging path except for actually sending it to cloud 
logging.
   @Test
-  public void testBeamFnApiLogEntry() throws IOException {
-    DataflowWorkerLoggingMDC.setJobId("testJobId");
+  public void testDirectLoggingEndToEnd() throws IOException {
     DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
     DataflowWorkerLoggingMDC.setWorkId("testWorkId");
 
+    ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+    FixedOutputStreamFactory factory = new 
FixedOutputStreamFactory(fileOutput);
+    AtomicReference<LogEntry> capturedEntry = new AtomicReference<>();
+    DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
+    handler.enableDirectLogging(
+        pipelineOptionsForTest(),
+        Level.SEVERE,
+        (LogEntry e) -> assertNull(capturedEntry.getAndSet(e)));
+    handler.publish(createLogRecord("test.message", null /* throwable */));
+    assertEquals("", new String(fileOutput.toByteArray(), 
StandardCharsets.UTF_8));
+
+    LogEntry entry = capturedEntry.get();
+    assertNotNull(entry);
+    assertEquals(Instant.ofEpochMilli(1), entry.getInstantTimestamp());
+    assertEquals(Severity.INFO, entry.getSeverity());
+
+    assertNotNull(entry.getResource());
+    assertEquals("dataflow_step", entry.getResource().getType());
     assertEquals(
+        ImmutableMap.of(
+            "job_name",
+            "testJobName",
+            "job_id",
+            "testJobId",
+            "region",
+            "testRegion",
+            "step_id",
+            "",
+            "project_id",
+            "testProject"),
+        entry.getResource().getLabels());
+
+    assertTrue(entry.getPayload() instanceof Payload.JsonPayload);
+    Payload.JsonPayload jsonPayload = entry.getPayload();
+    Map<String, Object> result = jsonPayload.getDataAsMap();
+    assertEquals("test.message", result.get("message"));
+    assertEquals("2", result.get("thread"));
+    assertEquals("LoggerName", result.get("logger"));
+    assertEquals("testWorkerId", result.get("worker"));
+    assertEquals("testWorkId", result.get("work"));
+
+    Map<String, String> labels = entry.getLabels();
+    assertEquals("testJobId", labels.get("dataflow.googleapis.com/job_id"));
+    assertEquals("testJobName", 
labels.get("dataflow.googleapis.com/job_name"));
+    assertEquals("testRegion", labels.get("dataflow.googleapis.com/region"));
+    assertEquals("testWorkerName", 
labels.get("compute.googleapis.com/resource_name"));
+    assertEquals("instance", 
labels.get("compute.googleapis.com/resource_type"));
+  }
+
+  private static Object messageInEntry(LogEntry entry) {
+    return ((Payload.JsonPayload) 
entry.getPayload()).getDataAsMap().get("message");
+  }
+
+  @Test
+  public void testDirectLoggingFallback() throws IOException {
+    ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+    FixedOutputStreamFactory factory = new 
FixedOutputStreamFactory(fileOutput);
+    ConcurrentLinkedDeque<LogEntry> capturedEntries = new 
ConcurrentLinkedDeque<>();
+    final AtomicBoolean failNext = new AtomicBoolean(false);
+    DataflowWorkerLoggingHandler handler = new 
DataflowWorkerLoggingHandler(factory, 0);
+    Consumer<LogEntry> directLogConsumer =
+        (LogEntry e) -> {
+          if (failNext.get()) {
+            throw new RuntimeException("Failure");
+          }
+          capturedEntries.add(e);
+        };
+    handler.enableDirectLogging(pipelineOptionsForTest(), Level.SEVERE, 
directLogConsumer);
+    handler.publish(createLogRecord("test.message", null /* throwable */));
+    assertEquals("", new String(fileOutput.toByteArray(), 
StandardCharsets.UTF_8));
+    assertEquals(1, capturedEntries.size());
+    assertEquals("test.message", messageInEntry(capturedEntries.getFirst()));
+    failNext.set(true);
+    capturedEntries.clear();
+
+    // This message should fail to send and be sent to disk.
+    handler.publish(createLogRecord("test.message2", null /* throwable */));
+
+    String fileContents = new String(fileOutput.toByteArray(), 
StandardCharsets.UTF_8);
+    String fallbackMsg =
+        "\"severity\":\"ERROR\",\"message\":\"Failed to buffer log to send 
directly to cloud logging, falling back to normal logging path for PT2H46M40S. 
Consider increasing the buffer size with --workerDirectLoggerBufferByteLimit 
and --workerDirectLoggerBufferElementLimit.";
+    assertTrue(fileContents + "didn't contain" + fallbackMsg, 
fileContents.contains(fallbackMsg));
+    String expected =
         
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
-            + 
"\"message\":\"test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
-            + 
"\"worker\":\"testWorkerId\",\"work\":\"1\",\"logger\":\"LoggerName\"}"
-            + System.lineSeparator(),
-        createJson(createLogEntry("test.message")));
+            + 
"\"message\":\"test.message2\",\"thread\":\"2\",\"logger\":\"LoggerName\"}";
+    assertTrue(fileContents + " didn't contain " + expected, 
fileContents.contains(expected));
+    assertEquals(0, capturedEntries.size());
+    fileOutput.reset();
+
+    // This message should also be sent to disk because we are backing off.
+    failNext.set(false);
+    handler.publish(createLogRecord("test.message3", null /* throwable */));
+    String expected2 =
+        
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+            + 
"\"message\":\"test.message3\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
+            + System.lineSeparator();
+    assertEquals(expected2, new String(fileOutput.toByteArray(), 
StandardCharsets.UTF_8));
+    assertEquals(0, capturedEntries.size());
   }
 
   @Test
-  public void testBeamFnApiLogEntryWithTrace() throws IOException {
-    DataflowWorkerLoggingMDC.setJobId("testJobId");
-    DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
-    DataflowWorkerLoggingMDC.setWorkId("testWorkId");
+  public void testConstructDirectLogEntryWithoutStage() throws IOException {
+    LogEntry entry = createLogEntry(createLogRecord("test.message", null /* 
throwable */));
+    assertEquals(
+        Payload.JsonPayload.of(
+            ImmutableMap.of("message", "test.message", "thread", "2", 
"logger", "LoggerName")),
+        entry.getPayload());
+    assertEquals(Severity.INFO, entry.getSeverity());
+    assertNull(entry.getResource());
+  }
+
+  @Test
+  public void testConstructLogEntryWithAllValuesInMDC() throws IOException {
+    DataflowExecutionState state = new 
TestDataflowExecutionState(nameContextForTest(), "activity");
+    tracker.enterState(state);
+
+    String testStage = "testStage";
+    String testWorkerId = "testWorkerId";
+    String testWorkId = "testWorkId";
+    String testJobId = "testJobId";
 
+    DataflowWorkerLoggingMDC.setStageName(testStage);
+    DataflowWorkerLoggingMDC.setWorkerId(testWorkerId);
+    DataflowWorkerLoggingMDC.setWorkId(testWorkId);
+    DataflowWorkerLoggingMDC.setJobId(testJobId);
+
+    LogEntry entry = createLogEntry(createLogRecord("test.message", null /* 
throwable */));
     assertEquals(
-        
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
-            + 
"\"message\":\"test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
-            + 
"\"worker\":\"testWorkerId\",\"work\":\"1\",\"logger\":\"LoggerName\","
-            + "\"exception\":\"testTrace\"}"
-            + System.lineSeparator(),
-        
createJson(createLogEntry("test.message").toBuilder().setTrace("testTrace").build()));
+        Payload.JsonPayload.of(
+            ImmutableMap.of(
+                "message",
+                "test.message",
+                "thread",
+                "2",
+                "logger",
+                "LoggerName",
+                "stage",
+                testStage,
+                "step",
+                NameContextsForTests.USER_NAME,
+                "worker",
+                testWorkerId,
+                "work",
+                testWorkId,
+                "job",
+                testJobId)),
+        entry.getPayload());
+    assertEquals(Severity.INFO, entry.getSeverity());
+    assertEquals("dataflow_step", entry.getResource().getType());
+    // The testing setup just sets the additional label, not all the default 
labels.
+    assertEquals(
+        ImmutableMap.of("step_id", NameContextsForTests.USER_NAME),
+        entry.getResource().getLabels());
+  }
+
+  @Test
+  public void testDirectLoggingWithMessageUsingCustomFormatter() throws 
IOException {
+    Formatter customFormatter =
+        new SimpleFormatter() {
+          @Override
+          public synchronized String formatMessage(LogRecord record) {
+            return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+          }
+        };
+    try (MDC.MDCCloseable ignored = MDC.putCloseable("testMdcKey", 
"testMdcValue")) {
+      LogEntry entry =
+          createLogEntry(
+              createLogRecord("test.message", null /* throwable */), 
customFormatter, null);
+      assertEquals(
+          Payload.JsonPayload.of(
+              ImmutableMap.of(
+                  "message", "testMdcValue:test.message", "thread", "2", 
"logger", "LoggerName")),
+          entry.getPayload());
+    }
+  }
+
+  @Test
+  public void testDirectLoggingWithMessageRequiringJulFormatting() throws 
IOException {
+    LogEntry entry =
+        createLogEntry(createLogRecord("test.message {0}", null /* throwable 
*/, "myFormatString"));
+    assertEquals(
+        Payload.JsonPayload.of(
+            ImmutableMap.of(
+                "message", "test.message myFormatString", "thread", "2", 
"logger", "LoggerName")),
+        entry.getPayload());
+  }
+
+  @Test
+  public void testDirectLoggingWithMessageAndException() throws IOException {
+    Throwable t = createThrowable();
+    LogEntry entry = createLogEntry(createLogRecord("test.message", t));
+    assertEquals(
+        Payload.JsonPayload.of(
+            ImmutableMap.of(
+                "message",
+                "test.message",
+                "thread",
+                "2",
+                "logger",
+                "LoggerName",
+                "exception",
+                "java.lang.Throwable: exception.test.message"
+                    + System.lineSeparator()
+                    + "\tat declaringClass1.method1(file1.java:1)"
+                    + System.lineSeparator()
+                    + "\tat declaringClass2.method2(file2.java:1)"
+                    + System.lineSeparator()
+                    + "\tat declaringClass3.method3(file3.java:1)"
+                    + System.lineSeparator())),
+        entry.getPayload());
+  }
+
+  @Test
+  public void testDirectLoggingWithException() throws IOException {
+    Throwable t = createThrowable();
+    LogEntry entry = createLogEntry(createLogRecord(null, t));
+    assertEquals(
+        Payload.JsonPayload.of(
+            ImmutableMap.of(
+                "thread",
+                "2",
+                "logger",
+                "LoggerName",
+                "exception",
+                "java.lang.Throwable: exception.test.message"
+                    + System.lineSeparator()
+                    + "\tat declaringClass1.method1(file1.java:1)"
+                    + System.lineSeparator()
+                    + "\tat declaringClass2.method2(file2.java:1)"
+                    + System.lineSeparator()
+                    + "\tat declaringClass3.method3(file3.java:1)"
+                    + System.lineSeparator())),
+        entry.getPayload());
+  }
+
+  @Test
+  public void testDirectLoggingWithCustomDataEnabledNoMdc() throws IOException 
{
+    LogEntry entry = createLogEntry(createLogRecord(), null, true);
+    assertEquals(
+        Payload.JsonPayload.of(
+            ImmutableMap.of("message", "test.message", "thread", "2", 
"logger", "LoggerName")),
+        entry.getPayload());
+  }
+
+  @Test
+  public void testDirectLoggingWithCustomDataDisabledWithMdc() throws 
IOException {
+    MDC.clear();
+    try (MDC.MDCCloseable closeable = MDC.putCloseable("key1", "cool value")) {
+      LogEntry entry = createLogEntry(createLogRecord());
+      assertEquals(
+          Payload.JsonPayload.of(
+              ImmutableMap.of("message", "test.message", "thread", "2", 
"logger", "LoggerName")),
+          entry.getPayload());
+    }
+  }
+
+  @Test
+  public void testDirectLoggingWithCustomDataEnabledWithMdc() throws 
IOException {
+    MDC.clear();
+    try (MDC.MDCCloseable ignored = MDC.putCloseable("key1", "cool value");
+        MDC.MDCCloseable ignored2 = MDC.putCloseable("key2", "another")) {
+      LogEntry entry = createLogEntry(createLogRecord(), null, true);
+      assertEquals(
+          Payload.JsonPayload.of(
+              ImmutableMap.of(
+                  "message",
+                  "test.message",
+                  "thread",
+                  "2",
+                  "logger",
+                  "LoggerName",
+                  "custom_data",
+                  ImmutableMap.of("key1", "cool value", "key2", "another"))),
+          entry.getPayload());
+    }
+  }
+
+  @Test
+  public void testDirectLoggingWithoutExceptionOrMessage() throws IOException {
+    LogEntry entry = createLogEntry(createLogRecord(null, null));
+    assertEquals(
+        Payload.JsonPayload.of(ImmutableMap.of("thread", "2", "logger", 
"LoggerName")),
+        entry.getPayload());
+  }
+
+  @Test
+  public void isConfiguredDirectLog() throws IOException {
+    ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+    FixedOutputStreamFactory factory = new 
FixedOutputStreamFactory(fileOutput);
+
+    // Using the default log level to determine.
+    LogRecord record = createLogRecord();
+    record.setLevel(Level.WARNING);
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setLevel(Level.SEVERE);
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setLevel(Level.INFO);
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setLevel(Level.FINE);
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+
+    // Using an override to determine
+    record.setLevel(Level.INFO);
+    record.setResourceBundle(
+        
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.INFO));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setResourceBundle(
+        
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.FINE));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setResourceBundle(
+        
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.WARNING));
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+    record.setResourceBundle(
+        
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.SEVERE));
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.WARNING));
+    assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record, 
Level.INFO));
+  }
+
+  @Test
+  public void testDirectLoggingThrottler() {
+    AtomicReference<Instant> fakeNow = new AtomicReference<>(Instant.EPOCH);
+    DirectLoggingThrottler throttler = new 
DirectLoggingThrottler(fakeNow::get);
+    assertTrue(throttler.shouldAttemptDirectLog());
+
+    throttler.setCooldownDuration(Duration.ofSeconds(10));
+
+    // First failure should trigger cooldown and return a message.
+    assertNotNull(throttler.noteDirectLoggingEnqueueFailure());
+    // Subsequent failures during cooldown should not return a message.
+    assertNull(throttler.noteDirectLoggingEnqueueFailure());
+    assertFalse(throttler.shouldAttemptDirectLog());
+
+    for (int i = 0; i < 9; ++i) {
+      fakeNow.set(Instant.ofEpochSecond(i));
+      assertFalse(throttler.shouldAttemptDirectLog());
+    }
+    fakeNow.set(Instant.ofEpochSecond(10));
+    assertTrue(throttler.shouldAttemptDirectLog());
+
+    // Note success and ensure we can still log.
+    throttler.noteDirectLoggingEnqueueSuccess();
+    assertTrue(throttler.shouldAttemptDirectLog());
+
+    fakeNow.set(Instant.ofEpochSecond(400));
+
+    // Fail again, then immediately note success to reset.
+    assertNotNull(throttler.noteDirectLoggingEnqueueFailure());
+    assertFalse(throttler.shouldAttemptDirectLog());
+    throttler.noteDirectLoggingEnqueueSuccess();
+    assertTrue(throttler.shouldAttemptDirectLog());
+
+    // Test error path.
+    assertNotNull(throttler.noteDirectLoggingError());
+    assertFalse(throttler.shouldAttemptDirectLog());
+    assertNull(throttler.noteDirectLoggingError());
+    for (int i = 0; i < 9; ++i) {
+      fakeNow.set(Instant.ofEpochSecond(400 + i));
+      assertFalse(throttler.shouldAttemptDirectLog());
+    }
+    fakeNow.set(Instant.ofEpochSecond(410));
+    assertTrue(throttler.shouldAttemptDirectLog());
   }
 
   /** @return A throwable with a fixed stack trace. */
@@ -399,6 +775,11 @@ public class DataflowWorkerLoggingHandlerTest {
     return throwable;
   }
 
+  /** Creates and returns a simple test LogRecord. */
+  private LogRecord createLogRecord() {
+    return createLogRecord("test.message", null);
+  }
+
   /**
    * Creates and returns a LogRecord with a given message and throwable.
    *
@@ -416,23 +797,4 @@ public class DataflowWorkerLoggingHandlerTest {
     logRecord.setParameters(params);
     return logRecord;
   }
-
-  /**
-   * Creates and returns a BeamFnApi.LogEntry with a given message and 
throwable.
-   *
-   * @param message The message to place in the {@link
-   *     org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry}
-   * @return A {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry} 
with the given
-   *     message.
-   */
-  private BeamFnApi.LogEntry createLogEntry(String message) {
-    return BeamFnApi.LogEntry.newBuilder()
-        .setLogLocation("LoggerName")
-        .setSeverity(BeamFnApi.LogEntry.Severity.Enum.INFO)
-        .setMessage(message)
-        .setInstructionId("1")
-        .setThread("2")
-        .setTimestamp(Timestamp.newBuilder().setSeconds(0).setNanos(1 * 
1000000))
-        .build();
-  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
index 425b2140a96..7537c17a17b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
@@ -29,6 +29,8 @@ import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
@@ -43,6 +45,7 @@ import java.util.logging.Level;
 import java.util.logging.LogManager;
 import java.util.logging.LogRecord;
 import java.util.logging.Logger;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
 import 
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -64,7 +67,7 @@ import org.slf4j.LoggerFactory;
  * Unit tests for {@link DataflowWorkerLoggingInitializer}.
  *
  * <p>Tests which validate written log messages should assume that other 
background tasks may
- * concurrently be writing log messages, since registered log handlers are 
global. Therefore it is
+ * concurrently be writing log messages, since registered log handlers are 
global. Therefore, it is
  * not safe to assert on log counts or whether the retrieved log collection is 
empty.
  */
 @RunWith(JUnit4.class)
@@ -74,16 +77,14 @@ public class DataflowWorkerLoggingInitializerTest {
 
   @Rule public RestoreSystemProperties restoreProperties = new 
RestoreSystemProperties();
 
-  // Should match {@link DataflowWorkerLoggingInitializer#FILEPATH_PROPERTY}
-  private static final String LOGPATH_PROPERTY = 
"dataflow.worker.logging.filepath";
-
   @Before
   public void setUp() {
     Path logFileBasePath = Paths.get(logFolder.getRoot().getAbsolutePath(), 
"logfile.txt");
-    System.setProperty(LOGPATH_PROPERTY, logFileBasePath.toString());
+    System.setProperty(RUNNER_FILEPATH_PROPERTY, logFileBasePath.toString());
     LogManager.getLogManager().reset();
     DataflowWorkerLoggingInitializer.reset();
     DataflowWorkerLoggingInitializer.initialize();
+    DataflowWorkerLoggingInitializer.testDirectLoggingInterceptor = (e) -> {};
   }
 
   @After
@@ -102,7 +103,7 @@ public class DataflowWorkerLoggingInitializerTest {
     Logger rootLogger = LogManager.getLogManager().getLogger("");
     assertEquals(1, rootLogger.getHandlers().length);
     assertEquals(Level.INFO, rootLogger.getLevel());
-    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0], 
Level.ALL);
+    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
   }
 
   @Test
@@ -116,7 +117,7 @@ public class DataflowWorkerLoggingInitializerTest {
     Logger rootLogger = LogManager.getLogManager().getLogger("");
     assertEquals(1, rootLogger.getHandlers().length);
     assertEquals(Level.WARNING, rootLogger.getLevel());
-    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0], 
Level.ALL);
+    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
   }
 
   @Test
@@ -129,7 +130,7 @@ public class DataflowWorkerLoggingInitializerTest {
     Logger rootLogger = LogManager.getLogManager().getLogger("");
     assertEquals(1, rootLogger.getHandlers().length);
     assertEquals(Level.WARNING, rootLogger.getLevel());
-    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0], 
Level.ALL);
+    assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
   }
 
   @Test
@@ -154,6 +155,26 @@ public class DataflowWorkerLoggingInitializerTest {
     assertTrue(aLogger.getUseParentHandlers());
   }
 
+  @Test
+  public void testWithDirectLogging() {
+    DataflowWorkerLoggingOptions options =
+        PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
+    options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.WARN);
+    
options.setDefaultWorkerDirectLoggerLevel(DataflowWorkerLoggingOptions.Level.DEBUG);
+
+    DataflowWorkerLoggingInitializer.configure(options);
+
+    Logger rootLogger = LogManager.getLogManager().getLogger("");
+    assertEquals(1, rootLogger.getHandlers().length);
+    assertEquals(Level.FINE, rootLogger.getLevel());
+    DataflowWorkerLoggingHandler handler =
+        assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
+    assertTrue(handler.testVerifyIsConfiguredDirectLog(new 
LogRecord(Level.FINE, "")));
+    assertTrue(handler.testVerifyIsConfiguredDirectLog(new 
LogRecord(Level.INFO, "")));
+    assertFalse(handler.testVerifyIsConfiguredDirectLog(new 
LogRecord(Level.WARNING, "")));
+    assertFalse(handler.testVerifyIsConfiguredDirectLog(new 
LogRecord(Level.SEVERE, "")));
+  }
+
   @Test
   public void testWithSdkHarnessCustomLogLevels() {
     SdkHarnessOptions options = 
PipelineOptionsFactory.as(SdkHarnessOptions.class);
@@ -175,9 +196,71 @@ public class DataflowWorkerLoggingInitializerTest {
     assertTrue(aLogger.getUseParentHandlers());
   }
 
-  private void assertIsDataflowWorkerLoggingHandler(Handler handler, Level 
level) {
+  @Test
+  public void testWithSdkHarnessCustomLogLevelsWithDirect() {
+    SdkHarnessOptions options = 
PipelineOptionsFactory.as(SdkHarnessOptions.class);
+    options.setDefaultSdkHarnessLogLevel(SdkHarnessOptions.LogLevel.WARN);
+    String a = "testWithSdkHarnessCustomLogLevelsWithDirectA";
+    String b = "testWithSdkHarnessCustomLogLevelsWithDirectB";
+    String c = "testWithSdkHarnessCustomLogLevelsWithDirectC";
+    String d = "testWithSdkHarnessCustomLogLevelsWithDirectD";
+    options.setSdkHarnessLogLevelOverrides(
+        new SdkHarnessLogLevelOverrides()
+            .addOverrideForName(a, SdkHarnessOptions.LogLevel.INFO)
+            .addOverrideForName(b, SdkHarnessOptions.LogLevel.ERROR)
+            .addOverrideForName(c, SdkHarnessOptions.LogLevel.WARN));
+    DataflowWorkerLoggingOptions loggingOptions = 
options.as(DataflowWorkerLoggingOptions.class);
+    
loggingOptions.setDefaultWorkerDirectLoggerLevel(DataflowWorkerLoggingOptions.Level.DEBUG);
+    loggingOptions.setWorkerDirectLogLevelOverrides(
+        new WorkerLogLevelOverrides()
+            .addOverrideForName(b, DataflowWorkerLoggingOptions.Level.TRACE)
+            .addOverrideForName(c, DataflowWorkerLoggingOptions.Level.ERROR)
+            .addOverrideForName(d, DataflowWorkerLoggingOptions.Level.INFO));
+
+    DataflowWorkerLoggingInitializer.configure(loggingOptions);
+
+    {
+      Logger aLogger = LogManager.getLogManager().getLogger(a);
+      assertNoHandlersButParent(aLogger);
+      assertEquals(Level.FINE, aLogger.getLevel());
+      assertEquals(
+          
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.INFO),
+          aLogger.getResourceBundle());
+    }
+
+    {
+      Logger bLogger = LogManager.getLogManager().getLogger(b);
+      assertNoHandlersButParent(bLogger);
+      assertEquals(Level.FINEST, bLogger.getLevel());
+      assertEquals(
+          
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.SEVERE),
+          bLogger.getResourceBundle());
+    }
+
+    {
+      Logger cLogger = LogManager.getLogManager().getLogger(c);
+      assertNoHandlersButParent(cLogger);
+      assertEquals(Level.WARNING, cLogger.getLevel());
+      assertNull(cLogger.getResourceBundle());
+    }
+
+    {
+      Logger dLogger = LogManager.getLogManager().getLogger(d);
+      assertNoHandlersButParent(dLogger);
+      assertEquals(Level.INFO, dLogger.getLevel());
+      assertNull(dLogger.getResourceBundle());
+    }
+  }
+
+  private DataflowWorkerLoggingHandler 
assertIsDataflowWorkerLoggingHandler(Handler handler) {
     assertThat(handler, instanceOf(DataflowWorkerLoggingHandler.class));
-    assertEquals(level, handler.getLevel());
+    assertEquals(Level.ALL, handler.getLevel());
+    return (DataflowWorkerLoggingHandler) handler;
+  }
+
+  private void assertNoHandlersButParent(Logger logger) {
+    assertTrue(logger.getUseParentHandlers());
+    assertEquals(0, logger.getHandlers().length);
   }
 
   @Test
@@ -309,10 +392,12 @@ public class DataflowWorkerLoggingInitializerTest {
 
   private List<String> retrieveLogLines() throws IOException {
     List<String> allLogLines = Lists.newArrayList();
-    for (File logFile : logFolder.getRoot().listFiles()) {
-      allLogLines.addAll(Files.readAllLines(logFile.toPath(), 
StandardCharsets.UTF_8));
+    @Nullable File[] files = logFolder.getRoot().listFiles();
+    if (files != null) {
+      for (File logFile : files) {
+        allLogLines.addAll(Files.readAllLines(logFile.toPath(), 
StandardCharsets.UTF_8));
+      }
     }
-
     return allLogLines;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index 62df1342f1d..e7cbc453c01 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -70,7 +70,7 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
   public void testLogRecordMetadata() {
     PrintStream printStream =
         JulHandlerPrintStreamAdapterFactory.create(
-            handler, "fooLogger", Level.WARNING, StandardCharsets.UTF_8);
+            handler::publish, "fooLogger", Level.WARNING, 
StandardCharsets.UTF_8);
     printStream.println("anyMessage");
 
     assertThat(handler.getLogs(), not(empty()));
@@ -207,6 +207,6 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
 
   private PrintStream createPrintStreamAdapter() {
     return JulHandlerPrintStreamAdapterFactory.create(
-        handler, LOGGER_NAME, Level.INFO, StandardCharsets.UTF_8);
+        handler::publish, LOGGER_NAME, Level.INFO, StandardCharsets.UTF_8);
   }
 }
diff --git 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
index e566c1446b2..f9d8e5ac350 100644
--- 
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
+++ 
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -77,14 +77,22 @@ public class GceMetadataUtil {
   }
 
   public static String fetchDataflowJobId() {
-    return GceMetadataUtil.fetchCustomGceMetadata("job_id");
+    return fetchCustomGceMetadata("job_id");
   }
 
   public static String fetchDataflowJobName() {
-    return GceMetadataUtil.fetchCustomGceMetadata("job_name");
+    return fetchCustomGceMetadata("job_name");
   }
 
   public static String fetchDataflowWorkerId() {
-    return GceMetadataUtil.fetchVmInstanceMetadata("id");
+    return fetchVmInstanceMetadata("id");
+  }
+
+  public static String fetchDataflowWorkerName() {
+    return fetchVmInstanceMetadata("name");
+  }
+
+  public static String fetchDataflowRegion() {
+    return fetchVmInstanceMetadata("cloud_region");
   }
 }

Reply via email to