This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 57ab2b96113 [Dataflow Java Runner] Add support for sending logs
directly to Cloud Logging (#37662)
57ab2b96113 is described below
commit 57ab2b961133ec47111ba9e3bbd16610901b84aa
Author: Sam Whittle <[email protected]>
AuthorDate: Tue Mar 10 19:42:12 2026 +0000
[Dataflow Java Runner] Add support for sending logs directly to Cloud
Logging (#37662)
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
runners/google-cloud-dataflow-java/build.gradle | 2 +
.../options/DataflowWorkerLoggingOptions.java | 74 ++
.../google-cloud-dataflow-java/worker/build.gradle | 7 +-
.../logging/DataflowWorkerLoggingHandler.java | 778 +++++++++++++++++----
.../logging/DataflowWorkerLoggingInitializer.java | 180 ++++-
.../JulHandlerPrintStreamAdapterFactory.java | 11 +-
.../logging/DataflowWorkerLoggingHandlerTest.java | 504 +++++++++++--
.../DataflowWorkerLoggingInitializerTest.java | 111 ++-
.../JulHandlerPrintStreamAdapterFactoryTest.java | 4 +-
.../sdk/extensions/gcp/util/GceMetadataUtil.java | 14 +-
11 files changed, 1425 insertions(+), 262 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 31248c641e3..9382527fa36 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -758,6 +758,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_datastore_v1_proto_client :
"com.google.cloud.datastore:datastore-v1-proto-client:2.34.0", //
[bomupgrader] sets version
google_cloud_firestore :
"com.google.cloud:google-cloud-firestore", //
google_cloud_platform_libraries_bom sets version
google_cloud_kms :
"com.google.cloud:google-cloud-kms", // google_cloud_platform_libraries_bom
sets version
+ google_cloud_logging :
"com.google.cloud:google-cloud-logging", // google_cloud_platform_libraries_bom
sets version
google_cloud_pubsub :
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom
sets version
// [bomupgrader] the BOM version is set by
scripts/tools/bomupgrader.py. If update manually, also update
// libraries-bom version on
sdks/java/container/license_scripts/dep_urls_java.yaml
@@ -773,7 +774,6 @@ class BeamModulePlugin implements Plugin<Project> {
google_http_client_apache_v2 :
"com.google.http-client:google-http-client-apache-v2", //
google_cloud_platform_libraries_bom sets version
google_http_client_gson :
"com.google.http-client:google-http-client-gson", //
google_cloud_platform_libraries_bom sets version
google_http_client_jackson :
"com.google.http-client:google-http-client-jackson:1.29.2",
- google_http_client_gson :
"com.google.http-client:google-http-client-gson", //
google_cloud_platform_libraries_bom sets version
google_http_client_protobuf :
"com.google.http-client:google-http-client-protobuf", //
google_cloud_platform_libraries_bom sets version
google_oauth_client :
"com.google.oauth-client:google-oauth-client:$google_oauth_clients_version",
google_oauth_client_java6 :
"com.google.oauth-client:google-oauth-client-java6:$google_oauth_clients_version",
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index c064aeed002..5a90926a239 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -110,6 +110,8 @@ dependencies {
implementation library.java.google_http_client
implementation library.java.google_http_client_gson
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
+ implementation library.java.google_cloud_logging
+ permitUnusedDeclared library.java.google_cloud_logging // BEAM-11761
implementation library.java.hamcrest
implementation library.java.jackson_annotations
implementation library.java.jackson_core
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
index 5322539b80e..ab412d6cfab 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java
@@ -66,6 +66,38 @@ public interface DataflowWorkerLoggingOptions extends
PipelineOptions {
void setDefaultWorkerLogLevel(Level level);
+ /**
+ * Controls the log level for which messages are uploaded to Cloud Logging.
If a message is
+ * configured to be sent to both directly to cloud logging and default
disk-based logging it will
+ * just be sent to disk-based logging. This allows for configuration such as
+ * "--defaultWorkerLogLevel=WARN --defaultWorkerDirectLoggerLevel=INFO where
INFO logs will be
+ * directly sent to cloud logging and WARN logs and higher will be sent to
disk-based logging.
+ *
+ * <p>Note that this is just the default and may be overridden for specific
classes with
+ * --workerDirectLogLevelOverrides.
+ */
+ @Description(
+ "Controls the default direct to Cloud Logging level of all logs without
a log level override."
+ + "If a message is configured to be sent to both directly to cloud
logging and default disk-based logging "
+ + "it will just be sent to disk-based logging.")
+ @Default.Enum("OFF")
+ Level getDefaultWorkerDirectLoggerLevel();
+
+ void setDefaultWorkerDirectLoggerLevel(Level level);
+
+ @Description(
+ "The maximum buffered bytes for records in the queue that are being sent
directly to Cloud Logging.")
+ @Default.Long(100L * 1024 * 1024)
+ Long getWorkerDirectLoggerBufferByteLimit();
+
+ void setWorkerDirectLoggerBufferByteLimit(Long value);
+
+ @Description("The maximum buffered elements in the queue being sent directly
to Cloud Logging.")
+ @Default.Long(1_000_000)
+ Long getWorkerDirectLoggerBufferElementLimit();
+
+ void setWorkerDirectLoggerBufferElementLimit(Long value);
+
/**
* Controls the log level given to messages printed to {@code System.out}.
*
@@ -121,6 +153,48 @@ public interface DataflowWorkerLoggingOptions extends
PipelineOptions {
void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
+ /**
+ * This option controls the direct log levels for specifically named
loggers. If a message is
+ * configured to be sent to both directly to cloud logging and default
disk-based logging it will
+ * just be sent to disk-based logging. If an override only exists for a
logger for direct logging,
+ * the --defaultWorkerLogLevel will be used for the non-direct configuration
for the logger.
+ *
+ * <p>Later options with equivalent names override earlier options.
+ *
+ * <p>See {@link WorkerLogLevelOverrides} for more information on how to
configure logging on a
+ * per {@link Class}, {@link Package}, or name basis. If used from the
command line, the expected
+ * format is {"Name":"Level",...}, further details on {@link
WorkerLogLevelOverrides#from}.
+ */
+ @Description(
+ "This option controls the direct log levels for specifically named
loggers. "
+ + "The expected format is {\"Name\":\"Level\",...}. The Dataflow
worker supports a logging "
+ + "hierarchy based off of names that are '.' separated. For example,
by specifying the value "
+ + "{\"a.b.c.Foo\":\"DEBUG\"}, the logger for the class 'a.b.c.Foo'
will be configured to "
+ + "output logs at the DEBUG level. Similarly, by specifying the
value {\"a.b.c\":\"WARN\"}, "
+ + "all loggers underneath the 'a.b.c' package will be configured to
output logs at the WARN "
+ + "level. System.out and System.err levels are configured via
loggers of the corresponding "
+ + "name. Also, note that when multiple overrides are specified, the
exact name followed by "
+ + "the closest parent takes precedence. Note that if an override is
just provided for the direct log level "
+ + "for a logger, the default non-direct log level will be used for
non-direct logs.")
+ WorkerLogLevelOverrides getWorkerDirectLogLevelOverrides();
+
+ void setWorkerDirectLogLevelOverrides(WorkerLogLevelOverrides value);
+
+ @Default.Boolean(true)
+ @Description(
+ "If true, when there are errors with sending logs directly to Cloud
Logging, the logs will fallback to "
+ + "disk-based logging. If false, such logs will be dropped.")
+ Boolean getDirectLoggingFallbackToDiskOnErrors();
+
+ void setDirectLoggingFallbackToDiskOnErrors(Boolean value);
+
+ @Default.Integer(10)
+ @Description(
+ "If an error is encountered with sending logs directly to Cloud Logging,
direct logging will not be attempted for this many seconds.")
+ Integer getDirectLoggingCooldownSeconds();
+
+ void setDirectLoggingCooldownSeconds(Integer value);
+
/**
* Defines a log level override for a specific class, package, or name.
*
diff --git a/runners/google-cloud-dataflow-java/worker/build.gradle
b/runners/google-cloud-dataflow-java/worker/build.gradle
index 4068c5f88e4..610fa411459 100644
--- a/runners/google-cloud-dataflow-java/worker/build.gradle
+++ b/runners/google-cloud-dataflow-java/worker/build.gradle
@@ -151,7 +151,6 @@ applyJavaNature(
exclude "META-INF/LICENSE.txt"
exclude "about.html"
})
-
/******************************************************************************/
// Configure the worker root project
@@ -167,6 +166,11 @@ configurations {
dependencies {
implementation
enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
+ implementation library.java.gax
+ implementation library.java.google_cloud_core
+ implementation library.java.guava
+ permitUnusedDeclared library.java.guava
+ implementation library.java.protobuf_java
// Note that any dependency that is modified here should also be modified
within
// runners/google-cloud-dataflow-java/worker/build.gradle using the rules
provided
@@ -193,6 +197,7 @@ dependencies {
implementation library.java.google_auth_library_credentials
implementation library.java.proto_google_common_protos
+ implementation library.java.google_cloud_logging
// Conscrypt shouldn't be included here because Conscrypt won't work when
being shaded.
// (Context: https://github.com/apache/beam/pull/13846)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
index 14af9851080..e81c277c7d7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.runners.dataflow.worker.logging;
+import static
org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils.abbreviateMiddle;
import static
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer.LEVELS;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
@@ -25,6 +28,22 @@ import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.batching.FlowControlSettings;
+import com.google.api.gax.batching.FlowController;
+import com.google.cloud.MonitoredResource;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.logging.LogEntry;
+import com.google.cloud.logging.Logging;
+import com.google.cloud.logging.LoggingLevel;
+import com.google.cloud.logging.LoggingOptions;
+import com.google.cloud.logging.Payload;
+import com.google.cloud.logging.Severity;
+import com.google.cloud.logging.Synchronicity;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.errorprone.annotations.concurrent.LazyInit;
+import com.google.protobuf.Struct;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -33,22 +52,43 @@ import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.logging.ErrorManager;
import java.util.logging.Handler;
+import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
import org.slf4j.MDC;
@@ -56,30 +96,9 @@ import org.slf4j.MDC;
* Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception
is represented using
* {@link Throwable#printStackTrace()}.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
public class DataflowWorkerLoggingHandler extends Handler {
- private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
- static {
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "TRACE"
- // onto "DEBUG" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO,
"INFO");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE,
"NOTICE");
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "WARN" onto
- // "WARNING" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN,
"WARNING");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR,
"ERROR");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
"CRITICAL");
- }
-
/**
* Buffer size to use when writing logs. This matches <a
* href="https://cloud.google.com/logging/quotas#log-limits">Logging usage
limits</a> to avoid
@@ -87,9 +106,107 @@ public class DataflowWorkerLoggingHandler extends Handler {
*/
private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
+ // Used as a side-channel for a Logger for which the configured non-direct
logging level doesn't
+ // match the default
+ // logging level and which is using direct logging.
+ private static class DirectHintResourceBundle extends ResourceBundle {
+ private static final String LEVEL_KEY = "NonDirectLogLevel";
+ private final Level nonDirectLogLevel;
+
+ DirectHintResourceBundle(Level nonDirectLogLevel) {
+ this.nonDirectLogLevel = nonDirectLogLevel;
+ }
+
+ @Override
+ public String getBaseBundleName() {
+ return "DataflowWorkerLoggingHandler";
+ }
+
+ @Override
+ protected Object handleGetObject(@Nonnull String s) {
+ if (LEVEL_KEY.equals(s)) {
+ return nonDirectLogLevel;
+ }
+ return new MissingResourceException(
+ "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+ }
+
+ @Override
+ public @Nonnull Enumeration<String> getKeys() {
+ return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+ }
+ }
+ // Since there are just a couple possible levels, we cache them.
+ private static final ConcurrentHashMap<Level, ResourceBundle>
resourceBundles =
+ new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level
nonDirectLogLevel) {
+ return resourceBundles.computeIfAbsent(nonDirectLogLevel,
DirectHintResourceBundle::new);
+ }
+
/** If true, add SLF4J MDC to custom_data of the log message. */
+ @LazyInit private boolean logCustomMdc = false;
+
+ // Only instantiated and set if enableDirectLogging is called.
+ private static class DirectLoggingState {
+ DirectLoggingState(
+ ImmutableMap<String, String> defaultLabels,
+ ImmutableMap<String, String> defaultResourceLabels,
+ MonitoredResource steplessMonitoredResource,
+ Level defaultNonDirectLogLevel,
+ boolean fallbackDirectErrorsToDisk,
+ @Nullable Logging directLogging,
+ Logging.WriteOption[] directWriteOptions,
+ @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+ checkState((directLogging == null) != (testDirectLogInterceptor ==
null));
+ this.defaultLabels = defaultLabels;
+ this.defaultResourceLabels = defaultResourceLabels;
+ this.steplessMonitoredResource = steplessMonitoredResource;
+ this.fallbackDirectErrorsToDisk = fallbackDirectErrorsToDisk;
+ this.defaultNonDirectLogLevel = defaultNonDirectLogLevel;
+ this.directLogging = directLogging;
+ this.directWriteOptions = directWriteOptions;
+ this.testDirectLogInterceptor =
+ testDirectLogInterceptor == null ? e -> {} :
testDirectLogInterceptor;
+ }
+
+ final @Nullable Logging directLogging;
+ final boolean fallbackDirectErrorsToDisk;
+ final Level defaultNonDirectLogLevel;
+ final Logging.WriteOption[] directWriteOptions;
+ final ImmutableMap<String, String> defaultResourceLabels;
+ final MonitoredResource steplessMonitoredResource;
+ final ImmutableMap<String, String> defaultLabels;
+ final Consumer<LogEntry> testDirectLogInterceptor;
+ }
+
+ private final AtomicReference<DirectLoggingState> directLoggingState = new
AtomicReference<>();
+
@GuardedBy("this")
- private boolean logCustomMdc = false;
+ Instant nextDirectFallbackLogInstant = Instant.EPOCH;
+
+ private static final String LOG_TYPE = "dataflow.googleapis.com%2Fworker";
+ private static final String RESOURCE_TYPE = "dataflow_step";
+ private static final String STEP_RESOURCE_LABEL = "step_id";
+ private static final int FIELD_MAX_LENGTH = 100;
+ private static final int LABEL_MAX_LENGTH = 100;
+ private static final int MESSAGE_MAX_LENGTH = 30_000;
+ // HACK: Detect stderr logs that originate from the LoggingImpl error
handling. Hopefully this can
+ // be removed if
+ // error handling in LoggingImpl can be customized to use the ErrorHandler.
+ private static final String LOGGING_IMPL_STDERR_PREFIX = "ERROR: onFailure
exception:";
+
+ // Null after close().
+ @GuardedBy("this")
+ private @Nullable JsonGenerator generator;
+
+ @GuardedBy("this")
+ private @Nullable CountingOutputStream counter;
+
+ private final long sizeLimit;
+ private final Supplier<OutputStream> outputStreamFactory;
+ private final JsonFactory generatorFactory;
/**
* Formats the throwable as per {@link Throwable#printStackTrace()}.
@@ -97,7 +214,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
* @param thrown The throwable to format.
* @return A string containing the contents of {@link
Throwable#printStackTrace()}.
*/
- public static String formatException(Throwable thrown) {
+ public static @Nullable String formatException(@Nullable Throwable thrown) {
if (thrown == null) {
return null;
}
@@ -108,15 +225,14 @@ public class DataflowWorkerLoggingHandler extends Handler
{
return sw.toString();
}
- /** Constructs a handler that writes to a rotating set of files. */
- public DataflowWorkerLoggingHandler(String filename, long sizeLimit) throws
IOException {
- this(new FileOutputStreamFactory(filename), sizeLimit);
- }
-
/**
* Constructs a handler that writes to arbitrary output streams. No rollover
if sizeLimit is zero
* or negative.
*/
+ public DataflowWorkerLoggingHandler(String filename, long sizeLimit) throws
IOException {
+ this(new FileOutputStreamFactory(filename), sizeLimit);
+ }
+
DataflowWorkerLoggingHandler(Supplier<OutputStream> factory, long sizeLimit)
throws IOException {
this.setFormatter(new SimpleFormatter());
this.outputStreamFactory = factory;
@@ -131,28 +247,335 @@ public class DataflowWorkerLoggingHandler extends
Handler {
createOutputStream();
}
- public synchronized void setLogMdc(boolean enabled) {
- this.logCustomMdc = enabled;
+ public void setLogMdc(boolean enabled) {
+ logCustomMdc = enabled;
}
- @Override
- public synchronized void publish(LogRecord record) {
- DataflowExecutionState currrentDataflowState = null;
- ExecutionState currrentState =
ExecutionStateTracker.getCurrentExecutionState();
- if (currrentState instanceof DataflowExecutionState) {
- currrentDataflowState = (DataflowExecutionState) currrentState;
- }
- // It's okay to pass in the null state, publish() handles and tests this.
- publish(currrentDataflowState, record);
+ private static Pair<ImmutableMap<String, String>, ImmutableMap<String,
String>>
+ labelsFromOptionsAndMetadata(PipelineOptions options) {
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ DataflowWorkerHarnessOptions harnessOptions =
options.as(DataflowWorkerHarnessOptions.class);
+ @Nullable String jobId = harnessOptions.getJobId();
+ if (jobId == null || jobId.isEmpty()) {
+ jobId = GceMetadataUtil.fetchDataflowJobId();
+ }
+ jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+ @Nullable String jobName = dataflowOptions.getJobName();
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = GceMetadataUtil.fetchDataflowJobName();
+ }
+ jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+ String region = dataflowOptions.getRegion();
+ if (region.isEmpty()) {
+ region = GceMetadataUtil.fetchDataflowRegion();
+ }
+ region = middleCrop(region, LABEL_MAX_LENGTH);
+
+ // Note that the id in the options is a string name, not the numeric VM id.
+ @Nullable String workerName = harnessOptions.getWorkerId();
+ if (workerName == null || workerName.isEmpty()) {
+ workerName = GceMetadataUtil.fetchDataflowWorkerName();
+ }
+ workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+ String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(),
LABEL_MAX_LENGTH);
+
+ @Nullable String projectId = harnessOptions.getProject();
+ if (projectId == null || projectId.isEmpty()) {
+ projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+ }
+ projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+ ImmutableMap.Builder<String, String> defaultLabelsBuilder = new
ImmutableMap.Builder<>();
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_type",
"instance");
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_name",
workerName);
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+
+ ImmutableMap.Builder<String, String> resourceLabelBuilder = new
ImmutableMap.Builder<>();
+ resourceLabelBuilder.put("job_id", jobId);
+ resourceLabelBuilder.put("job_name", jobName);
+ resourceLabelBuilder.put("project_id", projectId);
+ resourceLabelBuilder.put("region", region);
+ // We add the step when constructing the resource as it can change.
+
+ return Pair.of(defaultLabelsBuilder.buildOrThrow(),
resourceLabelBuilder.buildOrThrow());
+ }
+
+ private static String middleCrop(String value, int maxSize) {
+ return abbreviateMiddle(value, "...", maxSize);
+ }
+
+ private static Severity severityFor(Level level) {
+ if (level instanceof LoggingLevel) {
+ return ((LoggingLevel) level).getSeverity();
+ }
+ // Choose the severity based on Level range, rounding down.
+ // The assumption is that Level values below maintain same numeric value
+ int value = level.intValue();
+ if (value < Level.INFO.intValue()) {
+ // Includes Level.CONFIG, Level.FINE, Level.FINER, Level.FINEST
+ return Severity.DEBUG;
+ } else if (value < Level.WARNING.intValue()) {
+ return Severity.INFO;
+ } else if (value < Level.SEVERE.intValue()) {
+ return Severity.WARNING;
+ } else if (value < Level.OFF.intValue()) {
+ return Severity.ERROR;
+ }
+ // Level.OFF is special meaning not to log.
+ return Severity.NONE;
}
- public synchronized void publish(DataflowExecutionState
currentExecutionState, LogRecord record) {
- if (!isLoggable(record)) {
+ private static void addLogField(
+ Struct.Builder builder, String field, @Nullable String value, int
maxSize) {
+ if (value == null || value.isEmpty()) {
return;
}
+ builder.putFieldsBuilderIfAbsent(field).setStringValue(middleCrop(value,
maxSize));
+ }
+
+ @VisibleForTesting
+ LogEntry constructDirectLogEntry(
+ LogRecord record,
+ @Nullable DataflowExecutionState executionState,
+ ImmutableMap<String, String> defaultResourceLabels) {
+ Struct.Builder payloadBuilder = Struct.newBuilder();
+ addLogField(payloadBuilder, "logger", record.getLoggerName(),
FIELD_MAX_LENGTH);
+ addLogField(
+ payloadBuilder, "message", getFormatter().formatMessage(record),
MESSAGE_MAX_LENGTH);
+ addLogField(
+ payloadBuilder, "exception", formatException(record.getThrown()),
MESSAGE_MAX_LENGTH);
+ addLogField(payloadBuilder, "thread",
String.valueOf(record.getThreadID()), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "stage",
DataflowWorkerLoggingMDC.getStageName(), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "worker",
DataflowWorkerLoggingMDC.getWorkerId(), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "work", DataflowWorkerLoggingMDC.getWorkId(),
FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "job", DataflowWorkerLoggingMDC.getJobId(),
FIELD_MAX_LENGTH);
+ if (logCustomMdc) {
+ @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
+ if (mdcMap != null && !mdcMap.isEmpty()) {
+ Struct.Builder customDataBuilder =
+
payloadBuilder.putFieldsBuilderIfAbsent("custom_data").getStructValueBuilder();
+ mdcMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .forEach(
+ (entry) -> {
+ if (entry.getKey() != null) {
+ addLogField(
+ customDataBuilder, entry.getKey(), entry.getValue(),
FIELD_MAX_LENGTH);
+ }
+ });
+ }
+ }
+
+ @Nullable String stepId = null;
+ if (executionState != null) {
+ @Nullable NameContext nameContext = executionState.getStepName();
+ if (nameContext != null) {
+ stepId = nameContext.userName();
+ }
+ }
+ addLogField(payloadBuilder, "step", stepId, FIELD_MAX_LENGTH);
+
+ LogEntry.Builder builder =
+ LogEntry.newBuilder(Payload.JsonPayload.of(payloadBuilder.build()))
+ .setTimestamp(Instant.ofEpochMilli(record.getMillis()))
+ .setSeverity(severityFor(record.getLevel()));
+
+ if (stepId != null) {
+ builder.setResource(
+ MonitoredResource.newBuilder(RESOURCE_TYPE)
+ .setLabels(defaultResourceLabels)
+ .addLabel(STEP_RESOURCE_LABEL, stepId)
+ .build());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Enables logging of records directly to Cloud Logging instead of logging
to the disk. Should
+ * only be called once.
+ *
+ * @param pipelineOptions the pipelineOptions, used to configure buffers etc.
+ * @param defaultNonDirectLogLevel the default level at which logs should be
logged to disk.
+ * LogEntries that are below this level will be logged directly to Cloud
Logging. The behavior
+ * for a specific LogEntry can be overridden by attaching a
ResourceBundle obtained from
+ * resourceBundleForNonDirectLogLevelHint to it.
+ */
+ synchronized void enableDirectLogging(
+ PipelineOptions pipelineOptions,
+ Level defaultNonDirectLogLevel,
+ @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+ checkState(
+ directLoggingState.get() == null,
+ "enableDirectLogging should only be called once on a
DataflowWorkerLoggingHandler");
+ Pair<ImmutableMap<String, String>, ImmutableMap<String, String>> labels =
+ labelsFromOptionsAndMetadata(pipelineOptions);
+ ImmutableMap<String, String> defaultLabels = labels.getLeft();
+ ImmutableMap<String, String> defaultResourceLabels = labels.getRight();
+ MonitoredResource steplessMonitoredResource =
+ MonitoredResource.newBuilder(RESOURCE_TYPE)
+ .setLabels(defaultResourceLabels)
+ .addLabel(STEP_RESOURCE_LABEL, "")
+ .build();
+
+ DataflowWorkerLoggingOptions dfLoggingOptions =
+ pipelineOptions.as(DataflowWorkerLoggingOptions.class);
+ directThrottler.setCooldownDuration(
+
Duration.ofSeconds(dfLoggingOptions.getDirectLoggingCooldownSeconds()));
+
+ @Nullable Logging logging = null;
+ ArrayList<Logging.WriteOption> writeOptions = new ArrayList<>();
+ if (testDirectLogInterceptor == null) {
+ // Override some of the default settings.
+ LoggingOptions cloudLoggingOptions =
+ LoggingOptions.newBuilder()
+ .setBatchingSettings(
+ BatchingSettings.newBuilder()
+ .setFlowControlSettings(
+ FlowControlSettings.newBuilder()
+ .setLimitExceededBehavior(
+
FlowController.LimitExceededBehavior.ThrowException)
+ .setMaxOutstandingRequestBytes(
+
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit())
+ .setMaxOutstandingElementCount(
+
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit())
+ .build())
+ // These thresholds match the default settings in
+ // LoggingServiceV2StubSettings.java for
writeLogEntries. We must re-set them
+ // when we override the flow control settings because
otherwise
+ // BatchingSettings defaults to no batching.
+ .setElementCountThreshold(
+ Math.min(
+ 1000L,
+ Math.max(
+ 1L,
+
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit() / 2)))
+ .setRequestByteThreshold(
+ Math.min(
+ 1048576L,
+ Math.max(
+ 1,
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit() / 2)))
+ .setDelayThresholdDuration(Duration.ofMillis(50L))
+ .build())
+ .build();
+
+ writeOptions.add(Logging.WriteOption.labels(defaultLabels));
+ writeOptions.add(Logging.WriteOption.logName(LOG_TYPE));
+
writeOptions.add(Logging.WriteOption.resource(steplessMonitoredResource));
+
+ logging = cloudLoggingOptions.getService();
+ logging.setFlushSeverity(Severity.NONE);
+ logging.setWriteSynchronicity(Synchronicity.ASYNC);
+ }
+
+ checkState(
+ directLoggingState.getAndSet(
+ new DirectLoggingState(
+ defaultLabels,
+ defaultResourceLabels,
+ steplessMonitoredResource,
+ defaultNonDirectLogLevel,
+
Boolean.TRUE.equals(dfLoggingOptions.getDirectLoggingFallbackToDiskOnErrors()),
+ logging,
+ Iterables.toArray(writeOptions, Logging.WriteOption.class),
+ testDirectLogInterceptor))
+ == null,
+ "enableDirectLogging should only be called once on a
DataflowWorkerLoggingHandler");
+ }
+
+ private static @Nullable DataflowExecutionState
getCurrentDataflowExecutionState() {
+ @Nullable ExecutionState currentState =
ExecutionStateTracker.getCurrentExecutionState();
+ if (!(currentState instanceof DataflowExecutionState)) {
+ return null;
+ }
+ return (DataflowExecutionState) currentState;
+ }
+
+ @Override
+ public void publish(@Nullable LogRecord record) {
+ if (record == null) {
+ return;
+ }
+ publish(getCurrentDataflowExecutionState(), record);
+ }
+
+ public void publish(@Nullable DataflowExecutionState executionState,
LogRecord record) {
+ @Nullable DirectLoggingState direct = directLoggingState.get();
+ if (direct != null && isConfiguredDirectLog(record,
direct.defaultNonDirectLogLevel)) {
+ if (directThrottler.shouldAttemptDirectLog()) {
+ try {
+ LogEntry logEntry =
+ constructDirectLogEntry(record, executionState,
direct.defaultResourceLabels);
+ if (direct.directLogging != null) {
+ checkNotNull(direct.directLogging)
+ .write(ImmutableList.of(logEntry), direct.directWriteOptions);
+ } else {
+ // This is the testing path when testDirectLogInterceptor was
specified.
+ // The default labels are applied by write options generally but
we merge them in here
+ // so they are visible to the test.
+ HashMap<String, String> mergedLabels = new
HashMap<>(direct.defaultLabels);
+ mergedLabels.putAll(logEntry.getLabels());
+ logEntry = logEntry.toBuilder().setLabels(mergedLabels).build();
+ if (logEntry.getResource() == null) {
+ logEntry =
logEntry.toBuilder().setResource(direct.steplessMonitoredResource).build();
+ }
+ checkNotNull(direct.testDirectLogInterceptor).accept(logEntry);
+ }
+ directThrottler.noteDirectLoggingEnqueueSuccess();
+ return;
+ } catch (RuntimeException e) {
+ @Nullable String log =
directThrottler.noteDirectLoggingEnqueueFailure();
+ if (log != null) {
+ printErrorToDisk(log, e);
+ }
+ }
+ }
+
+ // Either we were throttled or encountered an error enqueuing the log.
+ if (!direct.fallbackDirectErrorsToDisk) {
+ boolean shouldLog = false;
+ Instant now = Instant.now();
+ synchronized (this) {
+ if (now.isAfter(nextDirectFallbackLogInstant)) {
+ nextDirectFallbackLogInstant = now.plusSeconds(300);
+ shouldLog = true;
+ }
+ }
+ if (shouldLog) {
+ printErrorToDisk(
+ "Unable to buffer log to send directly to cloud logging and
fallback is disabled, dropping log records.",
+ null);
+ }
+ return;
+ }
+ }
+ publishToDisk(executionState, record);
+ }
+
+ private void printErrorToDisk(String errorMsg, @Nullable Exception ex) {
+ LogRecord record = new LogRecord(Level.SEVERE, errorMsg);
+ record.setLoggerName(DataflowWorkerLoggingHandler.class.getName());
+ if (ex != null) {
+ record.setThrown(ex);
+ }
+ publishToDisk(null, record);
+ }
+
+ @SuppressWarnings({"nullness", "GuardedBy"})
+ public synchronized void publishToDisk(
+ @Nullable DataflowExecutionState currentExecutionState, LogRecord
record) {
rolloverOutputStreamIfNeeded();
+ if (generator == null) {
+ throw new IllegalStateException("rollover should have made generator");
+ }
try {
// Generating a JSON map like:
// {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ...
"message": "hello"}
@@ -168,21 +591,21 @@ public class DataflowWorkerLoggingHandler extends Handler
{
"severity",
MoreObjects.firstNonNull(LEVELS.get(record.getLevel()),
record.getLevel().getName()));
// Write the other labels.
- writeIfNotEmpty("message", getFormatter().formatMessage(record));
- writeIfNotEmpty("thread", String.valueOf(record.getThreadID()));
- writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
- writeIfNotEmpty("stage", DataflowWorkerLoggingMDC.getStageName());
+ writeIfNotEmpty(generator, "message",
getFormatter().formatMessage(record));
+ writeIfNotEmpty(generator, "thread",
String.valueOf(record.getThreadID()));
+ writeIfNotEmpty(generator, "job", DataflowWorkerLoggingMDC.getJobId());
+ writeIfNotEmpty(generator, "stage",
DataflowWorkerLoggingMDC.getStageName());
if (currentExecutionState != null) {
NameContext nameContext = currentExecutionState.getStepName();
if (nameContext != null) {
- writeIfNotEmpty("step", nameContext.userName());
+ writeIfNotEmpty(generator, "step", nameContext.userName());
}
}
- writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId());
- writeIfNotEmpty("work", DataflowWorkerLoggingMDC.getWorkId());
- writeIfNotEmpty("logger", record.getLoggerName());
- writeIfNotEmpty("exception", formatException(record.getThrown()));
+ writeIfNotEmpty(generator, "worker",
DataflowWorkerLoggingMDC.getWorkerId());
+ writeIfNotEmpty(generator, "work", DataflowWorkerLoggingMDC.getWorkId());
+ writeIfNotEmpty(generator, "logger", record.getLoggerName());
+ writeIfNotEmpty(generator, "exception",
formatException(record.getThrown()));
if (logCustomMdc) {
@Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
if (mdcMap != null && !mdcMap.isEmpty()) {
@@ -212,78 +635,131 @@ public class DataflowWorkerLoggingHandler extends
Handler {
// entries will be the inverse of the flush latency. That could be as
little as one hundred
// log entries per second on some systems. For higher throughput this
should be changed to
// batch publish operations while writes and flushes are in flight on a
different thread.
- flush();
+ flushDisk();
}
- public synchronized void publish(BeamFnApi.LogEntry logEntry) {
- if (generator == null || logEntry == null) {
- return;
+ @VisibleForTesting
+ static boolean isConfiguredDirectLog(LogRecord record, Level
defaultNonDirectLogLevel) {
+ @Nullable ResourceBundle resourceBundle = record.getResourceBundle();
+ Level nonDirectLogLevel =
+ (resourceBundle instanceof DirectHintResourceBundle)
+ ? ((DirectHintResourceBundle) resourceBundle).nonDirectLogLevel
+ : defaultNonDirectLogLevel;
+ return nonDirectLogLevel.intValue() > record.getLevel().intValue();
+ }
+
+ @VisibleForTesting
+ boolean testVerifyIsConfiguredDirectLog(LogRecord record) {
+ return isConfiguredDirectLog(
+ record,
checkNotNull(directLoggingState).get().defaultNonDirectLogLevel);
+ }
+
+ @VisibleForTesting
+ static final class DirectLoggingThrottler {
+ private final AtomicBoolean rejectingRequests = new AtomicBoolean(false);
+ private final Supplier<Instant> nowSupplier;
+
+ DirectLoggingThrottler(Supplier<Instant> nowSupplier) {
+ this.nowSupplier = nowSupplier;
}
- rolloverOutputStreamIfNeeded();
+ @GuardedBy("this")
+ private Duration cooldownDuration = Duration.ZERO;
- try {
- // Generating a JSON map like:
- // {"timestamp": {"seconds": 1435835832, "nanos": 123456789}, ...
"message": "hello"}
- generator.writeStartObject();
- // Write the timestamp.
- generator.writeFieldName("timestamp");
- generator.writeStartObject();
- generator.writeNumberField("seconds",
logEntry.getTimestamp().getSeconds());
- generator.writeNumberField("nanos", logEntry.getTimestamp().getNanos());
- generator.writeEndObject();
- // Write the severity.
- generator.writeObjectField(
- "severity",
- MoreObjects.firstNonNull(
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.get(logEntry.getSeverity()),
- logEntry.getSeverity().getValueDescriptor().getName()));
- // Write the other labels.
- writeIfNotEmpty("message", logEntry.getMessage());
- writeIfNotEmpty("thread", logEntry.getThread());
- writeIfNotEmpty("job", DataflowWorkerLoggingMDC.getJobId());
- // TODO: Write the stage execution information by translating the
currently execution
- // instruction id to a stage.
- // writeIfNotNull("stage", ...);
- writeIfNotEmpty("step", logEntry.getTransformId());
- writeIfNotEmpty("worker", DataflowWorkerLoggingMDC.getWorkerId());
- // Id should match to id in
//depot/google3/third_party/cloud/dataflow/worker/agent/sdk.go
- writeIfNotEmpty("portability_worker_id",
DataflowWorkerLoggingMDC.getSdkHarnessId());
- writeIfNotEmpty("work", logEntry.getInstructionId());
- writeIfNotEmpty("logger", logEntry.getLogLocation());
- // TODO: Figure out a way to get exceptions transported across Beam Fn
Logging API
- writeIfNotEmpty("exception", logEntry.getTrace());
- generator.writeEndObject();
- generator.writeRaw(System.lineSeparator());
- } catch (IOException | RuntimeException e) {
- reportFailure("Unable to publish", e, ErrorManager.WRITE_FAILURE);
+ @GuardedBy("this")
+ private Instant rejectRequestsUntil = Instant.MIN;
+
+ @GuardedBy("this")
+ private Instant failureStartTime = Instant.MAX;
+
+ @GuardedBy("this")
+ private Instant nextLogTime = Instant.MIN;
+
+ @GuardedBy("this")
+ private Instant nextSendErrorLogTime = Instant.MIN;
+
+ synchronized void setCooldownDuration(Duration cooldownDuration) {
+ this.cooldownDuration = cooldownDuration;
}
- // This implementation is based on that of java.util.logging.FileHandler,
which flushes in a
- // synchronized context like this. Unfortunately the maximum throughput
for generating log
- // entries will be the inverse of the flush latency. That could be as
little as one hundred
- // log entries per second on some systems. For higher throughput this
should be changed to
- // batch publish operations while writes and flushes are in flight on a
different thread.
- flush();
+ boolean shouldAttemptDirectLog() {
+ if (!rejectingRequests.get()) {
+ return true;
+ }
+
+ Instant now = nowSupplier.get();
+ synchronized (this) {
+ return !now.isBefore(rejectRequestsUntil);
+ }
+ }
+
+ void noteDirectLoggingEnqueueSuccess() {
+ if (!rejectingRequests.get()) {
+ return;
+ }
+
+ synchronized (this) {
+ rejectingRequests.set(false);
+ failureStartTime = Instant.MAX;
+ }
+ }
+
+ synchronized @Nullable String noteDirectLoggingEnqueueFailure() {
+ rejectingRequests.set(true);
+ if (failureStartTime.equals(Instant.MAX)) {
+ Instant now = nowSupplier.get();
+ failureStartTime = now;
+ rejectRequestsUntil = now.plus(cooldownDuration);
+ if (nextLogTime.isBefore(now)) {
+ nextLogTime = now.plusSeconds(60);
+ return String.format(
+ "Failed to buffer log to send directly to cloud logging, falling
back to normal logging path for %s. Consider increasing the buffer size with
--workerDirectLoggerBufferByteLimit and
--workerDirectLoggerBufferElementLimit.",
+ cooldownDuration);
+ }
+ }
+ return null;
+ }
+
+ synchronized @Nullable String noteDirectLoggingError() {
+ rejectingRequests.set(true);
+ Instant now = nowSupplier.get();
+ if (failureStartTime.equals(Instant.MAX)) {
+ failureStartTime = now;
+ }
+ rejectRequestsUntil = now.plus(cooldownDuration);
+ if (nextSendErrorLogTime.isBefore(now)) {
+ nextSendErrorLogTime = now.plusSeconds(60);
+ return String.format(
+ "Error occurred while handling direct logs, falling back to normal
logging path for %s.",
+ cooldownDuration);
+ }
+ return null;
+ }
}
- /**
- * Check if a LogRecord will be logged.
- *
- * <p>This method checks if the <tt>LogRecord</tt> has an appropriate level
and whether it
- * satisfies any <tt>Filter</tt>. It will also return false if the handler
has been closed, or the
- * LogRecord is null.
- */
- @Override
- public boolean isLoggable(LogRecord record) {
- return generator != null && record != null && super.isLoggable(record);
+ private final DirectLoggingThrottler directThrottler = new
DirectLoggingThrottler(Instant::now);
+
+ void publishStdErr(LogRecord record) {
+ @Nullable String msg = record.getMessage();
+ if (msg != null && msg.startsWith(LOGGING_IMPL_STDERR_PREFIX)) {
+ @Nullable String throttleMsg = directThrottler.noteDirectLoggingError();
+ if (throttleMsg != null) {
+ printErrorToDisk(throttleMsg, null);
+ }
+ }
+ publishToDisk(getCurrentDataflowExecutionState(), record);
}
- @Override
- public synchronized void flush() {
+ void publishStdOut(LogRecord record) {
+ publishToDisk(getCurrentDataflowExecutionState(), record);
+ }
+
+ private void flushDisk() {
try {
- if (generator != null) {
- generator.flush();
+ synchronized (this) {
+ if (generator != null) {
+ generator.flush();
+ }
}
} catch (IOException | RuntimeException e) {
reportFailure("Unable to flush", e, ErrorManager.FLUSH_FAILURE);
@@ -291,20 +767,39 @@ public class DataflowWorkerLoggingHandler extends Handler
{
}
@Override
- public synchronized void close() {
- // Flush any in-flight content, though there should not actually be any
because
- // the generator is currently flushed in the synchronized publish() method.
- flush();
- // Close the generator and log file.
+ public void flush() {
+ flushDisk();
+ @Nullable DirectLoggingState direct = directLoggingState.get();
+ if (direct != null && direct.directLogging != null) {
+ direct.directLogging.flush();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (this) {
+ // Flush any in-flight content, though there should not actually be any
because
+ // the generator is currently flushed in the synchronized publish()
method.
+ flush();
+ // Close the generator and log file.
+ try {
+ if (generator != null) {
+ generator.close();
+ }
+ } catch (IOException | RuntimeException e) {
+ reportFailure("Unable to close", e, ErrorManager.CLOSE_FAILURE);
+ } finally {
+ generator = null;
+ counter = null;
+ }
+ }
try {
- if (generator != null) {
- generator.close();
+ @Nullable DirectLoggingState direct = directLoggingState.get();
+ if (direct != null && direct.directLogging != null) {
+ direct.directLogging.close();
}
- } catch (IOException | RuntimeException e) {
+ } catch (Exception e) {
reportFailure("Unable to close", e, ErrorManager.CLOSE_FAILURE);
- } finally {
- generator = null;
- counter = null;
}
}
@@ -330,7 +825,7 @@ public class DataflowWorkerLoggingHandler extends Handler {
}
}
- private void createOutputStream() throws IOException {
+ private synchronized void createOutputStream() throws IOException {
CountingOutputStream stream = new
CountingOutputStream(outputStreamFactory.get());
generator = generatorFactory.createGenerator(stream, JsonEncoding.UTF8);
counter = stream;
@@ -343,20 +838,24 @@ public class DataflowWorkerLoggingHandler extends Handler
{
* Rollover to a new output stream (log file) if we have reached the size
limit. Ensure that the
* rollover fails or succeeds atomically.
*/
- private void rolloverOutputStreamIfNeeded() {
+ private synchronized void rolloverOutputStreamIfNeeded() {
+ if (counter == null) {
+ throw new IllegalStateException("");
+ }
if (counter.getCount() < sizeLimit) {
return;
}
try {
- JsonGenerator old = generator;
+ @Nullable JsonGenerator old = generator;
createOutputStream();
-
- try {
- // Rollover successful. Attempt to close old stream, but ignore on
failure.
- old.close();
- } catch (IOException | RuntimeException e) {
- reportFailure("Unable to close old log file", e,
ErrorManager.CLOSE_FAILURE);
+ if (old != null) {
+ try {
+ // Rollover successful. Attempt to close old stream, but ignore on
failure.
+ old.close();
+ } catch (IOException | RuntimeException e) {
+ reportFailure("Unable to close old log file", e,
ErrorManager.CLOSE_FAILURE);
+ }
}
} catch (IOException | RuntimeException e) {
reportFailure("Unable to create new log file", e,
ErrorManager.OPEN_FAILURE);
@@ -364,7 +863,8 @@ public class DataflowWorkerLoggingHandler extends Handler {
}
/** Appends a JSON key/value pair if the specified val is not null. */
- private void writeIfNotEmpty(String name, String val) throws IOException {
+ private static void writeIfNotEmpty(JsonGenerator generator, String name,
String val)
+ throws IOException {
if (val != null && !val.isEmpty()) {
generator.writeStringField(name, val);
}
@@ -381,12 +881,4 @@ public class DataflowWorkerLoggingHandler extends Handler {
// Failed to report logging failure. No meaningful action left.
}
}
-
- // Null after close().
- private JsonGenerator generator;
- private CountingOutputStream counter;
-
- private final long sizeLimit;
- private final Supplier<OutputStream> outputStreamFactory;
- private final JsonFactory generatorFactory;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
index a56c62e9231..0f1b5c2750b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializer.java
@@ -24,16 +24,19 @@ import static
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOpti
import static
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.TRACE;
import static
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.Level.WARN;
+import com.google.cloud.logging.LogEntry;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.Charset;
+import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.function.Consumer;
import java.util.logging.ErrorManager;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel;
@@ -77,8 +80,8 @@ public class DataflowWorkerLoggingInitializer {
private static final String FILESIZE_MB_PROPERTY =
"dataflow.worker.logging.filesize_mb";
- private static final String SYSTEM_OUT_LOG_NAME = "System.out";
- private static final String SYSTEM_ERR_LOG_NAME = "System.err";
+ static final String SYSTEM_OUT_LOG_NAME = "System.out";
+ static final String SYSTEM_ERR_LOG_NAME = "System.err";
static final ImmutableBiMap<Level, DataflowWorkerLoggingOptions.Level>
LEVELS =
ImmutableBiMap.<Level, DataflowWorkerLoggingOptions.Level>builder()
@@ -108,6 +111,7 @@ public class DataflowWorkerLoggingInitializer {
private static PrintStream originalStdOut;
private static PrintStream originalStdErr = System.err;
private static boolean initialized = false;
+ @VisibleForTesting static @Nullable Consumer<LogEntry>
testDirectLoggingInterceptor = null;
// This is the same as ErrorManager except that it uses the provided
// print stream.
@@ -174,10 +178,13 @@ public class DataflowWorkerLoggingInitializer {
originalStdErr = System.err;
System.setOut(
JulHandlerPrintStreamAdapterFactory.create(
- loggingHandler, SYSTEM_OUT_LOG_NAME, Level.INFO,
Charset.defaultCharset()));
+ loggingHandler::publish, SYSTEM_OUT_LOG_NAME, Level.INFO,
Charset.defaultCharset()));
System.setErr(
JulHandlerPrintStreamAdapterFactory.create(
- loggingHandler, SYSTEM_ERR_LOG_NAME, Level.SEVERE,
Charset.defaultCharset()));
+ loggingHandler::publish,
+ SYSTEM_ERR_LOG_NAME,
+ Level.SEVERE,
+ Charset.defaultCharset()));
// Initialize the SDK Logging Handler, which will only be used for the
LoggingService
sdkLoggingHandler = makeLoggingHandler(SDK_FILEPATH_PROPERTY,
DEFAULT_SDK_LOGGING_LOCATION);
@@ -188,6 +195,65 @@ public class DataflowWorkerLoggingInitializer {
}
}
+ private static class LevelOverrides {
+ LevelOverrides(Level disk, Level direct) {
+ this.direct = direct;
+ this.disk = disk;
+ }
+
+ final Level disk;
+ final Level direct;
+
+ LevelOverrides merge(@Nullable Level disk, @Nullable Level direct) {
+ return new LevelOverrides(
+ disk == null ? this.disk : disk, direct == null ? this.direct :
direct);
+ }
+ }
+
+ private static void applyOverridesToLogger(
+ Logger logger,
+ LevelOverrides overrides,
+ Level defaultNonDirectLogLevel,
+ boolean directLoggingEnabled) {
+ if (!directLoggingEnabled) {
+ logger.setLevel(overrides.disk);
+ if (overrides.disk.intValue() != Level.OFF.intValue()) {
+ LOG.warn(
+ "Ignoring the disk logging level override for {} because
--defaultWorkerDirectLoggerLevel was OFF.",
+ logger.getName());
+ }
+ return;
+ }
+
+ // Configure the logger to accept logs of the lower value. The log will be
sent directly based
+ // upon the default
+ // nonDirectLogLevel or the provided hint.
+ if (overrides.direct.intValue() < overrides.disk.intValue()) {
+ logger.setLevel(overrides.direct);
+ if (overrides.disk.intValue() != defaultNonDirectLogLevel.intValue()) {
+ logger.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(overrides.disk));
+ }
+ } else {
+ if (overrides.direct.intValue() != Level.OFF.intValue()) {
+ LOG.warn(
+ "Ignoring the direct logging level for {} as it is greater or
equal to the disk logging level, {} vs {}",
+ overrides.direct,
+ overrides.disk,
+ logger.getName());
+ }
+ logger.setLevel(overrides.disk);
+ if (overrides.disk.intValue() < defaultNonDirectLogLevel.intValue()) {
+ // We need to provide the hint as otherwise the default would be used
and send the log
+ // directly which is not
+ // desired. We don't bother if the threshold was increased as the
right decision would still
+ // be made.
+ logger.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(overrides.disk));
+ }
+ }
+ }
+
/** Reconfigures logging with the passed in options. */
public static synchronized void configure(DataflowWorkerLoggingOptions
options) {
if (!initialized) {
@@ -200,38 +266,105 @@ public class DataflowWorkerLoggingInitializer {
SdkHarnessOptions harnessOptions = options.as(SdkHarnessOptions.class);
boolean usedDeprecated = false;
- // default value for both DefaultSdkHarnessLogLevel and
DefaultWorkerLogLevel are INFO
- Level overrideLevel =
getJulLevel(harnessOptions.getDefaultSdkHarnessLogLevel());
+ // default value for both DefaultSdkHarnessLogLevel and
DefaultWorkerLogLevel are INFO for disk
+ // logging and OFF for direct logging.
+ LevelOverrides defaultOverrides = new LevelOverrides(Level.INFO,
Level.OFF);
if (options.getDefaultWorkerLogLevel() != null &&
options.getDefaultWorkerLogLevel() != INFO) {
- overrideLevel = getJulLevel(options.getDefaultWorkerLogLevel());
+ defaultOverrides =
+
defaultOverrides.merge(getJulLevel(options.getDefaultWorkerLogLevel()), null);
usedDeprecated = true;
+ } else {
+ defaultOverrides =
+
defaultOverrides.merge(getJulLevel(harnessOptions.getDefaultSdkHarnessLogLevel()),
null);
+ }
+ boolean directLoggingEnabled = false;
+ if (options.getDefaultWorkerDirectLoggerLevel() != null
+ && options.getDefaultWorkerDirectLoggerLevel() != OFF) {
+ Level diskLevel = defaultOverrides.disk;
+ Level directLevel =
getJulLevel(options.getDefaultWorkerDirectLoggerLevel());
+ LOG.info(
+ "Enabling sending logs directly to Cloud Logging between severity {}
and severity {}.",
+ directLevel,
+ diskLevel);
+ try {
+ loggingHandler.enableDirectLogging(options, diskLevel,
testDirectLoggingInterceptor);
+ LOG.info("Configuring sending to cloud logging directly succeeded.");
+ defaultOverrides = defaultOverrides.merge(null, directLevel);
+ directLoggingEnabled = true;
+ } catch (RuntimeException e) {
+ LOG.error("Unable to configure logs to be sent directly to cloud
logging", e);
+ }
}
-
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).setLevel(overrideLevel);
+ final LevelOverrides finalDefaultOverrides = defaultOverrides;
+ HashMap<String, LevelOverrides> loggerLevelOverrides = new HashMap<>();
if (options.getWorkerLogLevelOverrides() != null) {
- for (Map.Entry<String, DataflowWorkerLoggingOptions.Level>
loggerOverride :
- options.getWorkerLogLevelOverrides().entrySet()) {
- Logger logger = Logger.getLogger(loggerOverride.getKey());
- logger.setLevel(getJulLevel(loggerOverride.getValue()));
- configuredLoggers.add(logger);
- }
+ options
+ .getWorkerLogLevelOverrides()
+ .forEach(
+ (k, v) -> {
+ loggerLevelOverrides.compute(
+ k,
+ (lk, lv) -> {
+ if (lv == null) {
+ lv = finalDefaultOverrides;
+ }
+ return lv.merge(getJulLevel(v), null);
+ });
+ });
usedDeprecated = true;
} else if (harnessOptions.getSdkHarnessLogLevelOverrides() != null) {
- for (Map.Entry<String, SdkHarnessOptions.LogLevel> loggerOverride :
- harnessOptions.getSdkHarnessLogLevelOverrides().entrySet()) {
- Logger logger = Logger.getLogger(loggerOverride.getKey());
- logger.setLevel(getJulLevel(loggerOverride.getValue()));
- configuredLoggers.add(logger);
- }
+ harnessOptions
+ .getSdkHarnessLogLevelOverrides()
+ .forEach(
+ (k, v) -> {
+ loggerLevelOverrides.compute(
+ k,
+ (lk, lv) -> {
+ if (lv == null) {
+ lv = finalDefaultOverrides;
+ }
+ return lv.merge(getJulLevel(v), null);
+ });
+ });
+ }
+ if (options.getWorkerDirectLogLevelOverrides() != null) {
+ options
+ .getWorkerDirectLogLevelOverrides()
+ .forEach(
+ (k, v) -> {
+ loggerLevelOverrides.compute(
+ k,
+ (lk, lv) -> {
+ if (lv == null) {
+ lv = finalDefaultOverrides;
+ }
+ return lv.merge(null, getJulLevel(v));
+ });
+ });
}
+ applyOverridesToLogger(
+ LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME),
+ defaultOverrides,
+ finalDefaultOverrides.disk,
+ directLoggingEnabled);
+ boolean finalDirectLoggingEnabled = directLoggingEnabled;
+ loggerLevelOverrides.forEach(
+ (loggerName, levelOverrides) -> {
+ Logger logger = Logger.getLogger(loggerName);
+ applyOverridesToLogger(
+ logger, levelOverrides, finalDefaultOverrides.disk,
finalDirectLoggingEnabled);
+ configuredLoggers.add(logger);
+ });
+
// If the options specify a level for messages logged to System.out/err,
we need to reconfigure
// the corresponding stream adapter.
if (options.getWorkerSystemOutMessageLevel() != null) {
System.out.close();
System.setOut(
JulHandlerPrintStreamAdapterFactory.create(
- loggingHandler,
+ loggingHandler::publishStdOut,
SYSTEM_OUT_LOG_NAME,
getJulLevel(options.getWorkerSystemOutMessageLevel()),
Charset.defaultCharset()));
@@ -241,13 +374,14 @@ public class DataflowWorkerLoggingInitializer {
System.err.close();
System.setErr(
JulHandlerPrintStreamAdapterFactory.create(
- loggingHandler,
+ loggingHandler::publishStdErr,
SYSTEM_ERR_LOG_NAME,
getJulLevel(options.getWorkerSystemErrMessageLevel()),
Charset.defaultCharset()));
}
if (harnessOptions.getLogMdc()) {
+ LOG.info("Due to logMdc option, MDC fields will be added to custom_data
field of logs.");
loggingHandler.setLogMdc(true);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
index 7e4f3a2cb18..92b27927f59 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
@@ -31,6 +31,7 @@ import java.nio.charset.CodingErrorAction;
import java.util.Formatter;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
@@ -62,7 +63,7 @@ class JulHandlerPrintStreamAdapterFactory {
/** Hold reference of named logger to check configured {@link Level}. */
private final Logger logger;
- private final Handler handler;
+ private final Consumer<LogRecord> handler;
private final String loggerName;
private final Level messageLevel;
@@ -79,7 +80,7 @@ class JulHandlerPrintStreamAdapterFactory {
private ByteArrayOutputStream carryOverBytes;
private JulHandlerPrintStream(
- Handler handler, String loggerName, Level logLevel, Charset charset)
+ Consumer<LogRecord> handler, String loggerName, Level logLevel,
Charset charset)
throws UnsupportedEncodingException {
super(
new OutputStream() {
@@ -406,11 +407,11 @@ class JulHandlerPrintStreamAdapterFactory {
if (OUTPUT_WARNING.compareAndSet(false, true)) {
LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);
log.setLoggerName(loggerName);
- handler.publish(log);
+ handler.accept(log);
}
LogRecord log = new LogRecord(messageLevel, message);
log.setLoggerName(loggerName);
- handler.publish(log);
+ handler.accept(log);
}
}
}
@@ -420,7 +421,7 @@ class JulHandlerPrintStreamAdapterFactory {
* specified {@code loggerName} and {@code level}.
*/
static PrintStream create(
- Handler handler, String loggerName, Level messageLevel, Charset charset)
{
+ Consumer<LogRecord> handler, String loggerName, Level messageLevel,
Charset charset) {
try {
return new JulHandlerPrintStream(handler, loggerName, messageLevel,
charset);
} catch (UnsupportedEncodingException exc) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
index 3191228687c..0f4752de169 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandlerTest.java
@@ -19,26 +19,44 @@ package org.apache.beam.runners.dataflow.worker.logging;
import static
org.apache.beam.runners.dataflow.worker.NameContextsForTests.nameContextForTest;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.cloud.logging.LogEntry;
+import com.google.cloud.logging.Payload;
+import com.google.cloud.logging.Severity;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.NameContextsForTests;
import
org.apache.beam.runners.dataflow.worker.TestOperationContext.TestDataflowExecutionState;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler.DirectLoggingThrottler;
import
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.Timestamp;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -88,48 +106,60 @@ public class DataflowWorkerLoggingHandlerTest {
/** Encodes a LogRecord into a Json string. */
private static String createJson(LogRecord record) throws IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
- DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
- // Format the record as JSON.
- handler.publish(record);
- // Decode the binary output as UTF-8 and return the generated string.
- return new String(output.toByteArray(), StandardCharsets.UTF_8);
+ return createJson(record, null, null);
}
- private static String createJson(LogRecord record, Formatter formatter)
throws IOException {
+ private static String createJson(
+ LogRecord record, @Nullable Formatter formatter, @Nullable Boolean
enableMdc)
+ throws IOException {
ByteArrayOutputStream output = new ByteArrayOutputStream();
FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
- handler.setFormatter(formatter);
+ if (formatter != null) {
+ handler.setFormatter(formatter);
+ }
+ if (enableMdc != null) {
+ handler.setLogMdc(enableMdc);
+ }
// Format the record as JSON.
handler.publish(record);
// Decode the binary output as UTF-8 and return the generated string.
return new String(output.toByteArray(), StandardCharsets.UTF_8);
}
- private static String createJsonWithCustomMdc(LogRecord record) throws
IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
- DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
- handler.setLogMdc(true);
- // Format the record as JSON.
- handler.publish(record);
- // Decode the binary output as UTF-8 and return the generated string.
- return new String(output.toByteArray(), StandardCharsets.UTF_8);
+ private static LogEntry createLogEntry(LogRecord record) throws IOException {
+ return createLogEntry(record, null, null);
}
- /**
- * Encodes a {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry}
into a Json string.
- */
- private static String createJson(BeamFnApi.LogEntry record) throws
IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- FixedOutputStreamFactory factory = new FixedOutputStreamFactory(output);
+ private static PipelineOptions pipelineOptionsForTest() {
+ PipelineOptionsFactory.register(DataflowWorkerHarnessOptions.class);
+ return PipelineOptionsFactory.fromArgs(
+ "--jobId=testJobId",
+ "--jobName=testJobName",
+ "--region=testRegion",
+ "--project=testProject",
+ "--workerId=testWorkerName",
+ "--directLoggingCooldownSeconds=10000")
+ .create();
+ }
+
+ private static LogEntry createLogEntry(
+ LogRecord record, @Nullable Formatter formatter, @Nullable Boolean
enableMdc)
+ throws IOException {
+ ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+ FixedOutputStreamFactory factory = new
FixedOutputStreamFactory(fileOutput);
DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
- // Format the record as JSON.
- handler.publish(record);
- // Decode the binary output as UTF-8 and return the generated string.
- return new String(output.toByteArray(), StandardCharsets.UTF_8);
+ if (formatter != null) {
+ handler.setFormatter(formatter);
+ }
+ if (enableMdc != null) {
+ handler.setLogMdc(enableMdc);
+ }
+ handler.enableDirectLogging(pipelineOptionsForTest(), Level.SEVERE, (e) ->
{});
+ return handler.constructDirectLogEntry(
+ record,
+ (DataflowExecutionState)
ExecutionStateTracker.getCurrentExecutionState(),
+ ImmutableMap.of());
}
private final ExecutionStateTracker tracker =
ExecutionStateTracker.newForTest();
@@ -164,7 +194,7 @@ public class DataflowWorkerLoggingHandlerTest {
new DataflowWorkerLoggingHandler(factory, expected.length() + 1 /*
sizelimit */);
// Using |expected|+1 for size limit means that we will rollover after
writing 2 log messages.
- // We thus expect to see 2 messsages written to 'first' and 1 message to
'second',
+ // We thus expect to see 2 messages written to 'first' and 1 message to
'second',
handler.publish(record);
handler.publish(record);
@@ -201,7 +231,6 @@ public class DataflowWorkerLoggingHandlerTest {
DataflowWorkerLoggingMDC.setWorkerId(testWorkerId);
DataflowWorkerLoggingMDC.setWorkId(testWorkId);
- createLogRecord("test.message", null /* throwable */);
assertEquals(
String.format(
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
@@ -250,7 +279,7 @@ public class DataflowWorkerLoggingHandlerTest {
+
"\"message\":\"testMdcValue:test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
+
"\"worker\":\"testWorkerId\",\"work\":\"testWorkId\",\"logger\":\"LoggerName\"}"
+ System.lineSeparator(),
- createJson(createLogRecord("test.message", null /* throwable */),
customFormatter));
+ createJson(createLogRecord("test.message", null /* throwable */),
customFormatter, null));
}
}
@@ -316,7 +345,7 @@ public class DataflowWorkerLoggingHandlerTest {
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
+ System.lineSeparator(),
- createJsonWithCustomMdc(createLogRecord("test.message", null)));
+ createJson(createLogRecord(), null, true));
}
@Test
@@ -327,7 +356,7 @@ public class DataflowWorkerLoggingHandlerTest {
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
+ System.lineSeparator(),
- createJson(createLogRecord("test.message", null)));
+ createJson(createLogRecord()));
}
}
@@ -340,7 +369,7 @@ public class DataflowWorkerLoggingHandlerTest {
+
"\"message\":\"test.message\",\"thread\":\"2\",\"logger\":\"LoggerName\","
+ "\"custom_data\":{\"key1\":\"cool
value\",\"key2\":\"another\"}}"
+ System.lineSeparator(),
- createJsonWithCustomMdc(createLogRecord("test.message", null)));
+ createJson(createLogRecord(), null, true));
}
}
@@ -358,33 +387,380 @@ public class DataflowWorkerLoggingHandlerTest {
createJson(createLogRecord(null /* message */, null /* throwable */)));
}
+ // Test the direct logging path except for actually sending it to cloud
logging.
@Test
- public void testBeamFnApiLogEntry() throws IOException {
- DataflowWorkerLoggingMDC.setJobId("testJobId");
+ public void testDirectLoggingEndToEnd() throws IOException {
DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
DataflowWorkerLoggingMDC.setWorkId("testWorkId");
+ ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+ FixedOutputStreamFactory factory = new
FixedOutputStreamFactory(fileOutput);
+ AtomicReference<LogEntry> capturedEntry = new AtomicReference<>();
+ DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
+ handler.enableDirectLogging(
+ pipelineOptionsForTest(),
+ Level.SEVERE,
+ (LogEntry e) -> assertNull(capturedEntry.getAndSet(e)));
+ handler.publish(createLogRecord("test.message", null /* throwable */));
+ assertEquals("", new String(fileOutput.toByteArray(),
StandardCharsets.UTF_8));
+
+ LogEntry entry = capturedEntry.get();
+ assertNotNull(entry);
+ assertEquals(Instant.ofEpochMilli(1), entry.getInstantTimestamp());
+ assertEquals(Severity.INFO, entry.getSeverity());
+
+ assertNotNull(entry.getResource());
+ assertEquals("dataflow_step", entry.getResource().getType());
assertEquals(
+ ImmutableMap.of(
+ "job_name",
+ "testJobName",
+ "job_id",
+ "testJobId",
+ "region",
+ "testRegion",
+ "step_id",
+ "",
+ "project_id",
+ "testProject"),
+ entry.getResource().getLabels());
+
+ assertTrue(entry.getPayload() instanceof Payload.JsonPayload);
+ Payload.JsonPayload jsonPayload = entry.getPayload();
+ Map<String, Object> result = jsonPayload.getDataAsMap();
+ assertEquals("test.message", result.get("message"));
+ assertEquals("2", result.get("thread"));
+ assertEquals("LoggerName", result.get("logger"));
+ assertEquals("testWorkerId", result.get("worker"));
+ assertEquals("testWorkId", result.get("work"));
+
+ Map<String, String> labels = entry.getLabels();
+ assertEquals("testJobId", labels.get("dataflow.googleapis.com/job_id"));
+ assertEquals("testJobName",
labels.get("dataflow.googleapis.com/job_name"));
+ assertEquals("testRegion", labels.get("dataflow.googleapis.com/region"));
+ assertEquals("testWorkerName",
labels.get("compute.googleapis.com/resource_name"));
+ assertEquals("instance",
labels.get("compute.googleapis.com/resource_type"));
+ }
+
+ private static Object messageInEntry(LogEntry entry) {
+ return ((Payload.JsonPayload)
entry.getPayload()).getDataAsMap().get("message");
+ }
+
+ @Test
+ public void testDirectLoggingFallback() throws IOException {
+ ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+ FixedOutputStreamFactory factory = new
FixedOutputStreamFactory(fileOutput);
+ ConcurrentLinkedDeque<LogEntry> capturedEntries = new
ConcurrentLinkedDeque<>();
+ final AtomicBoolean failNext = new AtomicBoolean(false);
+ DataflowWorkerLoggingHandler handler = new
DataflowWorkerLoggingHandler(factory, 0);
+ Consumer<LogEntry> directLogConsumer =
+ (LogEntry e) -> {
+ if (failNext.get()) {
+ throw new RuntimeException("Failure");
+ }
+ capturedEntries.add(e);
+ };
+ handler.enableDirectLogging(pipelineOptionsForTest(), Level.SEVERE,
directLogConsumer);
+ handler.publish(createLogRecord("test.message", null /* throwable */));
+ assertEquals("", new String(fileOutput.toByteArray(),
StandardCharsets.UTF_8));
+ assertEquals(1, capturedEntries.size());
+ assertEquals("test.message", messageInEntry(capturedEntries.getFirst()));
+ failNext.set(true);
+ capturedEntries.clear();
+
+ // This message should fail to send and be sent to disk.
+ handler.publish(createLogRecord("test.message2", null /* throwable */));
+
+ String fileContents = new String(fileOutput.toByteArray(),
StandardCharsets.UTF_8);
+ String fallbackMsg =
+ "\"severity\":\"ERROR\",\"message\":\"Failed to buffer log to send
directly to cloud logging, falling back to normal logging path for PT2H46M40S.
Consider increasing the buffer size with --workerDirectLoggerBufferByteLimit
and --workerDirectLoggerBufferElementLimit.";
+ assertTrue(fileContents + "didn't contain" + fallbackMsg,
fileContents.contains(fallbackMsg));
+ String expected =
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
- +
"\"message\":\"test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
- +
"\"worker\":\"testWorkerId\",\"work\":\"1\",\"logger\":\"LoggerName\"}"
- + System.lineSeparator(),
- createJson(createLogEntry("test.message")));
+ +
"\"message\":\"test.message2\",\"thread\":\"2\",\"logger\":\"LoggerName\"}";
+ assertTrue(fileContents + " didn't contain " + expected,
fileContents.contains(expected));
+ assertEquals(0, capturedEntries.size());
+ fileOutput.reset();
+
+ // This message should also be sent to disk because we are backing off.
+ failNext.set(false);
+ handler.publish(createLogRecord("test.message3", null /* throwable */));
+ String expected2 =
+
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
+ +
"\"message\":\"test.message3\",\"thread\":\"2\",\"logger\":\"LoggerName\"}"
+ + System.lineSeparator();
+ assertEquals(expected2, new String(fileOutput.toByteArray(),
StandardCharsets.UTF_8));
+ assertEquals(0, capturedEntries.size());
}
@Test
- public void testBeamFnApiLogEntryWithTrace() throws IOException {
- DataflowWorkerLoggingMDC.setJobId("testJobId");
- DataflowWorkerLoggingMDC.setWorkerId("testWorkerId");
- DataflowWorkerLoggingMDC.setWorkId("testWorkId");
+ public void testConstructDirectLogEntryWithoutStage() throws IOException {
+ LogEntry entry = createLogEntry(createLogRecord("test.message", null /*
throwable */));
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of("message", "test.message", "thread", "2",
"logger", "LoggerName")),
+ entry.getPayload());
+ assertEquals(Severity.INFO, entry.getSeverity());
+ assertNull(entry.getResource());
+ }
+
+ @Test
+ public void testConstructLogEntryWithAllValuesInMDC() throws IOException {
+ DataflowExecutionState state = new
TestDataflowExecutionState(nameContextForTest(), "activity");
+ tracker.enterState(state);
+
+ String testStage = "testStage";
+ String testWorkerId = "testWorkerId";
+ String testWorkId = "testWorkId";
+ String testJobId = "testJobId";
+ DataflowWorkerLoggingMDC.setStageName(testStage);
+ DataflowWorkerLoggingMDC.setWorkerId(testWorkerId);
+ DataflowWorkerLoggingMDC.setWorkId(testWorkId);
+ DataflowWorkerLoggingMDC.setJobId(testJobId);
+
+ LogEntry entry = createLogEntry(createLogRecord("test.message", null /*
throwable */));
assertEquals(
-
"{\"timestamp\":{\"seconds\":0,\"nanos\":1000000},\"severity\":\"INFO\","
- +
"\"message\":\"test.message\",\"thread\":\"2\",\"job\":\"testJobId\","
- +
"\"worker\":\"testWorkerId\",\"work\":\"1\",\"logger\":\"LoggerName\","
- + "\"exception\":\"testTrace\"}"
- + System.lineSeparator(),
-
createJson(createLogEntry("test.message").toBuilder().setTrace("testTrace").build()));
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "message",
+ "test.message",
+ "thread",
+ "2",
+ "logger",
+ "LoggerName",
+ "stage",
+ testStage,
+ "step",
+ NameContextsForTests.USER_NAME,
+ "worker",
+ testWorkerId,
+ "work",
+ testWorkId,
+ "job",
+ testJobId)),
+ entry.getPayload());
+ assertEquals(Severity.INFO, entry.getSeverity());
+ assertEquals("dataflow_step", entry.getResource().getType());
+ // The testing setup just sets the additional label, not all the default
labels.
+ assertEquals(
+ ImmutableMap.of("step_id", NameContextsForTests.USER_NAME),
+ entry.getResource().getLabels());
+ }
+
+ @Test
+ public void testDirectLoggingWithMessageUsingCustomFormatter() throws
IOException {
+ Formatter customFormatter =
+ new SimpleFormatter() {
+ @Override
+ public synchronized String formatMessage(LogRecord record) {
+ return MDC.get("testMdcKey") + ":" + super.formatMessage(record);
+ }
+ };
+ try (MDC.MDCCloseable ignored = MDC.putCloseable("testMdcKey",
"testMdcValue")) {
+ LogEntry entry =
+ createLogEntry(
+ createLogRecord("test.message", null /* throwable */),
customFormatter, null);
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "message", "testMdcValue:test.message", "thread", "2",
"logger", "LoggerName")),
+ entry.getPayload());
+ }
+ }
+
+ @Test
+ public void testDirectLoggingWithMessageRequiringJulFormatting() throws
IOException {
+ LogEntry entry =
+ createLogEntry(createLogRecord("test.message {0}", null /* throwable
*/, "myFormatString"));
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "message", "test.message myFormatString", "thread", "2",
"logger", "LoggerName")),
+ entry.getPayload());
+ }
+
+ @Test
+ public void testDirectLoggingWithMessageAndException() throws IOException {
+ Throwable t = createThrowable();
+ LogEntry entry = createLogEntry(createLogRecord("test.message", t));
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "message",
+ "test.message",
+ "thread",
+ "2",
+ "logger",
+ "LoggerName",
+ "exception",
+ "java.lang.Throwable: exception.test.message"
+ + System.lineSeparator()
+ + "\tat declaringClass1.method1(file1.java:1)"
+ + System.lineSeparator()
+ + "\tat declaringClass2.method2(file2.java:1)"
+ + System.lineSeparator()
+ + "\tat declaringClass3.method3(file3.java:1)"
+ + System.lineSeparator())),
+ entry.getPayload());
+ }
+
+ @Test
+ public void testDirectLoggingWithException() throws IOException {
+ Throwable t = createThrowable();
+ LogEntry entry = createLogEntry(createLogRecord(null, t));
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "thread",
+ "2",
+ "logger",
+ "LoggerName",
+ "exception",
+ "java.lang.Throwable: exception.test.message"
+ + System.lineSeparator()
+ + "\tat declaringClass1.method1(file1.java:1)"
+ + System.lineSeparator()
+ + "\tat declaringClass2.method2(file2.java:1)"
+ + System.lineSeparator()
+ + "\tat declaringClass3.method3(file3.java:1)"
+ + System.lineSeparator())),
+ entry.getPayload());
+ }
+
+ @Test
+ public void testDirectLoggingWithCustomDataEnabledNoMdc() throws IOException
{
+ LogEntry entry = createLogEntry(createLogRecord(), null, true);
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of("message", "test.message", "thread", "2",
"logger", "LoggerName")),
+ entry.getPayload());
+ }
+
+ @Test
+ public void testDirectLoggingWithCustomDataDisabledWithMdc() throws
IOException {
+ MDC.clear();
+ try (MDC.MDCCloseable closeable = MDC.putCloseable("key1", "cool value")) {
+ LogEntry entry = createLogEntry(createLogRecord());
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of("message", "test.message", "thread", "2",
"logger", "LoggerName")),
+ entry.getPayload());
+ }
+ }
+
+ @Test
+ public void testDirectLoggingWithCustomDataEnabledWithMdc() throws
IOException {
+ MDC.clear();
+ try (MDC.MDCCloseable ignored = MDC.putCloseable("key1", "cool value");
+ MDC.MDCCloseable ignored2 = MDC.putCloseable("key2", "another")) {
+ LogEntry entry = createLogEntry(createLogRecord(), null, true);
+ assertEquals(
+ Payload.JsonPayload.of(
+ ImmutableMap.of(
+ "message",
+ "test.message",
+ "thread",
+ "2",
+ "logger",
+ "LoggerName",
+ "custom_data",
+ ImmutableMap.of("key1", "cool value", "key2", "another"))),
+ entry.getPayload());
+ }
+ }
+
+ @Test
+ public void testDirectLoggingWithoutExceptionOrMessage() throws IOException {
+ LogEntry entry = createLogEntry(createLogRecord(null, null));
+ assertEquals(
+ Payload.JsonPayload.of(ImmutableMap.of("thread", "2", "logger",
"LoggerName")),
+ entry.getPayload());
+ }
+
+ @Test
+ public void isConfiguredDirectLog() throws IOException {
+ ByteArrayOutputStream fileOutput = new ByteArrayOutputStream();
+ FixedOutputStreamFactory factory = new
FixedOutputStreamFactory(fileOutput);
+
+ // Using the default log level to determine.
+ LogRecord record = createLogRecord();
+ record.setLevel(Level.WARNING);
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setLevel(Level.SEVERE);
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setLevel(Level.INFO);
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setLevel(Level.FINE);
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+
+ // Using an override to determine
+ record.setLevel(Level.INFO);
+ record.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.INFO));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.FINE));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertFalse(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.WARNING));
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ record.setResourceBundle(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.SEVERE));
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.WARNING));
+ assertTrue(DataflowWorkerLoggingHandler.isConfiguredDirectLog(record,
Level.INFO));
+ }
+
+ @Test
+ public void testDirectLoggingThrottler() {
+ AtomicReference<Instant> fakeNow = new AtomicReference<>(Instant.EPOCH);
+ DirectLoggingThrottler throttler = new
DirectLoggingThrottler(fakeNow::get);
+ assertTrue(throttler.shouldAttemptDirectLog());
+
+ throttler.setCooldownDuration(Duration.ofSeconds(10));
+
+ // First failure should trigger cooldown and return a message.
+ assertNotNull(throttler.noteDirectLoggingEnqueueFailure());
+ // Subsequent failures during cooldown should not return a message.
+ assertNull(throttler.noteDirectLoggingEnqueueFailure());
+ assertFalse(throttler.shouldAttemptDirectLog());
+
+ for (int i = 0; i < 9; ++i) {
+ fakeNow.set(Instant.ofEpochSecond(i));
+ assertFalse(throttler.shouldAttemptDirectLog());
+ }
+ fakeNow.set(Instant.ofEpochSecond(10));
+ assertTrue(throttler.shouldAttemptDirectLog());
+
+ // Note success and ensure we can still log.
+ throttler.noteDirectLoggingEnqueueSuccess();
+ assertTrue(throttler.shouldAttemptDirectLog());
+
+ fakeNow.set(Instant.ofEpochSecond(400));
+
+ // Fail again, then immediately note success to reset.
+ assertNotNull(throttler.noteDirectLoggingEnqueueFailure());
+ assertFalse(throttler.shouldAttemptDirectLog());
+ throttler.noteDirectLoggingEnqueueSuccess();
+ assertTrue(throttler.shouldAttemptDirectLog());
+
+ // Test error path.
+ assertNotNull(throttler.noteDirectLoggingError());
+ assertFalse(throttler.shouldAttemptDirectLog());
+ assertNull(throttler.noteDirectLoggingError());
+ for (int i = 0; i < 9; ++i) {
+ fakeNow.set(Instant.ofEpochSecond(400 + i));
+ assertFalse(throttler.shouldAttemptDirectLog());
+ }
+ fakeNow.set(Instant.ofEpochSecond(410));
+ assertTrue(throttler.shouldAttemptDirectLog());
}
/** @return A throwable with a fixed stack trace. */
@@ -399,6 +775,11 @@ public class DataflowWorkerLoggingHandlerTest {
return throwable;
}
+ /** Creates and returns a simple test LogRecord. */
+ private LogRecord createLogRecord() {
+ return createLogRecord("test.message", null);
+ }
+
/**
* Creates and returns a LogRecord with a given message and throwable.
*
@@ -416,23 +797,4 @@ public class DataflowWorkerLoggingHandlerTest {
logRecord.setParameters(params);
return logRecord;
}
-
- /**
- * Creates and returns a BeamFnApi.LogEntry with a given message and
throwable.
- *
- * @param message The message to place in the {@link
- * org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry}
- * @return A {@link org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry}
with the given
- * message.
- */
- private BeamFnApi.LogEntry createLogEntry(String message) {
- return BeamFnApi.LogEntry.newBuilder()
- .setLogLocation("LoggerName")
- .setSeverity(BeamFnApi.LogEntry.Severity.Enum.INFO)
- .setMessage(message)
- .setInstructionId("1")
- .setThread("2")
- .setTimestamp(Timestamp.newBuilder().setSeconds(0).setNanos(1 *
1000000))
- .build();
- }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
index 425b2140a96..7537c17a17b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingInitializerTest.java
@@ -29,6 +29,8 @@ import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
@@ -43,6 +45,7 @@ import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import
org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions.WorkerLogLevelOverrides;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -64,7 +67,7 @@ import org.slf4j.LoggerFactory;
* Unit tests for {@link DataflowWorkerLoggingInitializer}.
*
* <p>Tests which validate written log messages should assume that other
background tasks may
- * concurrently be writing log messages, since registered log handlers are
global. Therefore it is
+ * concurrently be writing log messages, since registered log handlers are
global. Therefore, it is
* not safe to assert on log counts or whether the retrieved log collection is
empty.
*/
@RunWith(JUnit4.class)
@@ -74,16 +77,14 @@ public class DataflowWorkerLoggingInitializerTest {
@Rule public RestoreSystemProperties restoreProperties = new
RestoreSystemProperties();
- // Should match {@link DataflowWorkerLoggingInitializer#FILEPATH_PROPERTY}
- private static final String LOGPATH_PROPERTY =
"dataflow.worker.logging.filepath";
-
@Before
public void setUp() {
Path logFileBasePath = Paths.get(logFolder.getRoot().getAbsolutePath(),
"logfile.txt");
- System.setProperty(LOGPATH_PROPERTY, logFileBasePath.toString());
+ System.setProperty(RUNNER_FILEPATH_PROPERTY, logFileBasePath.toString());
LogManager.getLogManager().reset();
DataflowWorkerLoggingInitializer.reset();
DataflowWorkerLoggingInitializer.initialize();
+ DataflowWorkerLoggingInitializer.testDirectLoggingInterceptor = (e) -> {};
}
@After
@@ -102,7 +103,7 @@ public class DataflowWorkerLoggingInitializerTest {
Logger rootLogger = LogManager.getLogManager().getLogger("");
assertEquals(1, rootLogger.getHandlers().length);
assertEquals(Level.INFO, rootLogger.getLevel());
- assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0],
Level.ALL);
+ assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
}
@Test
@@ -116,7 +117,7 @@ public class DataflowWorkerLoggingInitializerTest {
Logger rootLogger = LogManager.getLogManager().getLogger("");
assertEquals(1, rootLogger.getHandlers().length);
assertEquals(Level.WARNING, rootLogger.getLevel());
- assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0],
Level.ALL);
+ assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
}
@Test
@@ -129,7 +130,7 @@ public class DataflowWorkerLoggingInitializerTest {
Logger rootLogger = LogManager.getLogManager().getLogger("");
assertEquals(1, rootLogger.getHandlers().length);
assertEquals(Level.WARNING, rootLogger.getLevel());
- assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0],
Level.ALL);
+ assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
}
@Test
@@ -154,6 +155,26 @@ public class DataflowWorkerLoggingInitializerTest {
assertTrue(aLogger.getUseParentHandlers());
}
+ @Test
+ public void testWithDirectLogging() {
+ DataflowWorkerLoggingOptions options =
+ PipelineOptionsFactory.as(DataflowWorkerLoggingOptions.class);
+ options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.WARN);
+
options.setDefaultWorkerDirectLoggerLevel(DataflowWorkerLoggingOptions.Level.DEBUG);
+
+ DataflowWorkerLoggingInitializer.configure(options);
+
+ Logger rootLogger = LogManager.getLogManager().getLogger("");
+ assertEquals(1, rootLogger.getHandlers().length);
+ assertEquals(Level.FINE, rootLogger.getLevel());
+ DataflowWorkerLoggingHandler handler =
+ assertIsDataflowWorkerLoggingHandler(rootLogger.getHandlers()[0]);
+ assertTrue(handler.testVerifyIsConfiguredDirectLog(new
LogRecord(Level.FINE, "")));
+ assertTrue(handler.testVerifyIsConfiguredDirectLog(new
LogRecord(Level.INFO, "")));
+ assertFalse(handler.testVerifyIsConfiguredDirectLog(new
LogRecord(Level.WARNING, "")));
+ assertFalse(handler.testVerifyIsConfiguredDirectLog(new
LogRecord(Level.SEVERE, "")));
+ }
+
@Test
public void testWithSdkHarnessCustomLogLevels() {
SdkHarnessOptions options =
PipelineOptionsFactory.as(SdkHarnessOptions.class);
@@ -175,9 +196,71 @@ public class DataflowWorkerLoggingInitializerTest {
assertTrue(aLogger.getUseParentHandlers());
}
- private void assertIsDataflowWorkerLoggingHandler(Handler handler, Level
level) {
+ @Test
+ public void testWithSdkHarnessCustomLogLevelsWithDirect() {
+ SdkHarnessOptions options =
PipelineOptionsFactory.as(SdkHarnessOptions.class);
+ options.setDefaultSdkHarnessLogLevel(SdkHarnessOptions.LogLevel.WARN);
+ String a = "testWithSdkHarnessCustomLogLevelsWithDirectA";
+ String b = "testWithSdkHarnessCustomLogLevelsWithDirectB";
+ String c = "testWithSdkHarnessCustomLogLevelsWithDirectC";
+ String d = "testWithSdkHarnessCustomLogLevelsWithDirectD";
+ options.setSdkHarnessLogLevelOverrides(
+ new SdkHarnessLogLevelOverrides()
+ .addOverrideForName(a, SdkHarnessOptions.LogLevel.INFO)
+ .addOverrideForName(b, SdkHarnessOptions.LogLevel.ERROR)
+ .addOverrideForName(c, SdkHarnessOptions.LogLevel.WARN));
+ DataflowWorkerLoggingOptions loggingOptions =
options.as(DataflowWorkerLoggingOptions.class);
+
loggingOptions.setDefaultWorkerDirectLoggerLevel(DataflowWorkerLoggingOptions.Level.DEBUG);
+ loggingOptions.setWorkerDirectLogLevelOverrides(
+ new WorkerLogLevelOverrides()
+ .addOverrideForName(b, DataflowWorkerLoggingOptions.Level.TRACE)
+ .addOverrideForName(c, DataflowWorkerLoggingOptions.Level.ERROR)
+ .addOverrideForName(d, DataflowWorkerLoggingOptions.Level.INFO));
+
+ DataflowWorkerLoggingInitializer.configure(loggingOptions);
+
+ {
+ Logger aLogger = LogManager.getLogManager().getLogger(a);
+ assertNoHandlersButParent(aLogger);
+ assertEquals(Level.FINE, aLogger.getLevel());
+ assertEquals(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.INFO),
+ aLogger.getResourceBundle());
+ }
+
+ {
+ Logger bLogger = LogManager.getLogManager().getLogger(b);
+ assertNoHandlersButParent(bLogger);
+ assertEquals(Level.FINEST, bLogger.getLevel());
+ assertEquals(
+
DataflowWorkerLoggingHandler.resourceBundleForNonDirectLogLevelHint(Level.SEVERE),
+ bLogger.getResourceBundle());
+ }
+
+ {
+ Logger cLogger = LogManager.getLogManager().getLogger(c);
+ assertNoHandlersButParent(cLogger);
+ assertEquals(Level.WARNING, cLogger.getLevel());
+ assertNull(cLogger.getResourceBundle());
+ }
+
+ {
+ Logger dLogger = LogManager.getLogManager().getLogger(d);
+ assertNoHandlersButParent(dLogger);
+ assertEquals(Level.INFO, dLogger.getLevel());
+ assertNull(dLogger.getResourceBundle());
+ }
+ }
+
+ private DataflowWorkerLoggingHandler
assertIsDataflowWorkerLoggingHandler(Handler handler) {
assertThat(handler, instanceOf(DataflowWorkerLoggingHandler.class));
- assertEquals(level, handler.getLevel());
+ assertEquals(Level.ALL, handler.getLevel());
+ return (DataflowWorkerLoggingHandler) handler;
+ }
+
+ private void assertNoHandlersButParent(Logger logger) {
+ assertTrue(logger.getUseParentHandlers());
+ assertEquals(0, logger.getHandlers().length);
}
@Test
@@ -309,10 +392,12 @@ public class DataflowWorkerLoggingInitializerTest {
private List<String> retrieveLogLines() throws IOException {
List<String> allLogLines = Lists.newArrayList();
- for (File logFile : logFolder.getRoot().listFiles()) {
- allLogLines.addAll(Files.readAllLines(logFile.toPath(),
StandardCharsets.UTF_8));
+ @Nullable File[] files = logFolder.getRoot().listFiles();
+ if (files != null) {
+ for (File logFile : files) {
+ allLogLines.addAll(Files.readAllLines(logFile.toPath(),
StandardCharsets.UTF_8));
+ }
}
-
return allLogLines;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
index 62df1342f1d..e7cbc453c01 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java
@@ -70,7 +70,7 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
public void testLogRecordMetadata() {
PrintStream printStream =
JulHandlerPrintStreamAdapterFactory.create(
- handler, "fooLogger", Level.WARNING, StandardCharsets.UTF_8);
+ handler::publish, "fooLogger", Level.WARNING,
StandardCharsets.UTF_8);
printStream.println("anyMessage");
assertThat(handler.getLogs(), not(empty()));
@@ -207,6 +207,6 @@ public class JulHandlerPrintStreamAdapterFactoryTest {
private PrintStream createPrintStreamAdapter() {
return JulHandlerPrintStreamAdapterFactory.create(
- handler, LOGGER_NAME, Level.INFO, StandardCharsets.UTF_8);
+ handler::publish, LOGGER_NAME, Level.INFO, StandardCharsets.UTF_8);
}
}
diff --git
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
index e566c1446b2..f9d8e5ac350 100644
---
a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
+++
b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GceMetadataUtil.java
@@ -77,14 +77,22 @@ public class GceMetadataUtil {
}
public static String fetchDataflowJobId() {
- return GceMetadataUtil.fetchCustomGceMetadata("job_id");
+ return fetchCustomGceMetadata("job_id");
}
public static String fetchDataflowJobName() {
- return GceMetadataUtil.fetchCustomGceMetadata("job_name");
+ return fetchCustomGceMetadata("job_name");
}
public static String fetchDataflowWorkerId() {
- return GceMetadataUtil.fetchVmInstanceMetadata("id");
+ return fetchVmInstanceMetadata("id");
+ }
+
+ public static String fetchDataflowWorkerName() {
+ return fetchVmInstanceMetadata("name");
+ }
+
+ public static String fetchDataflowRegion() {
+ return fetchVmInstanceMetadata("cloud_region");
}
}