scwhittle commented on code in PR #37662:
URL: https://github.com/apache/beam/pull/37662#discussion_r2904153353
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,145 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.logging.ErrorManager;
import java.util.logging.Handler;
+import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
import org.slf4j.MDC;
/**
* Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception
is represented using
* {@link Throwable#printStackTrace()}.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
public class DataflowWorkerLoggingHandler extends Handler {
- private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
- static {
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "TRACE"
- // onto "DEBUG" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO,
"INFO");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE,
"NOTICE");
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "WARN" onto
- // "WARNING" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN,
"WARNING");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR,
"ERROR");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
"CRITICAL");
- }
-
/**
* Buffer size to use when writing logs. This matches <a
* href="https://cloud.google.com/logging/quotas#log-limits">Logging usage
limits</a> to avoid
* spreading the same log entry across multiple disk flushes.
*/
private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
+ // Used as a side-channel for a Logger for which the configured non-direct
logging level doesn't
+ // match the default
+ // logging level and which is using direct logging.
+ private static class DirectHintResourceBundle extends ResourceBundle {
+ private static final String LEVEL_KEY = "NonDirectLogLevel";
+ private final Level nonDirectLogLevel;
+
+ DirectHintResourceBundle(Level directLevel) {
+ this.nonDirectLogLevel = directLevel;
+ }
+
+ @Override
+ public String getBaseBundleName() {
+ return "DataflowWorkerLoggingHandler";
+ }
+
+ @Override
+ protected Object handleGetObject(@Nonnull String s) {
+ if (LEVEL_KEY.equals(s)) {
+ return nonDirectLogLevel;
+ }
+ return new MissingResourceException(
+ "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+ }
+
+ @Override
+ public @Nonnull Enumeration<String> getKeys() {
+ return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+ }
+ }
+ // Since there are just a couple possible levels, we cache them.
+ private static final ConcurrentHashMap<Level, ResourceBundle>
resourceBundles =
+ new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level
nonDirectLogLevel) {
+ return resourceBundles.computeIfAbsent(nonDirectLogLevel,
DirectHintResourceBundle::new);
+ }
+
/** If true, add SLF4J MDC to custom_data of the log message. */
+ @LazyInit private boolean logCustomMdc = false;
+
+ // All of the direct logging related fields are only initialized if
enableDirectLogging is called.
+ // Afterwards they
+ // are logically final.
+ @LazyInit private @Nullable Logging directLogging = null;
+ @LazyInit private boolean fallbackDirectErrorsToDisk = false;
+ @LazyInit private Level defaultNonDirectLogLevel = Level.ALL;
+ @LazyInit private Logging.WriteOption[] directWriteOptions = new
Logging.WriteOption[0];
+ @LazyInit private ImmutableMap<String, String> defaultResourceLabels =
ImmutableMap.of();
+
+ @LazyInit
+ private MonitoredResource steplessMonitoredResource =
+ MonitoredResource.newBuilder(RESOURCE_TYPE).build();
+
+ @LazyInit private ImmutableMap<String, String> defaultLabels =
ImmutableMap.of();
+ @LazyInit private @Nullable Consumer<LogEntry> testDirectLogInterceptor;
+
+ private static final String LOG_TYPE = "dataflow.googleapis.com%2Fworker";
Review Comment:
This matches the existing dataflow logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]