arunpandianp commented on code in PR #37662:
URL: https://github.com/apache/beam/pull/37662#discussion_r2897905519
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -131,28 +222,315 @@ public DataflowWorkerLoggingHandler(String filename,
long sizeLimit) throws IOEx
createOutputStream();
}
- public synchronized void setLogMdc(boolean enabled) {
- this.logCustomMdc = enabled;
+ public void setLogMdc(boolean enabled) {
+ logCustomMdc = enabled;
}
- @Override
- public synchronized void publish(LogRecord record) {
- DataflowExecutionState currrentDataflowState = null;
- ExecutionState currrentState =
ExecutionStateTracker.getCurrentExecutionState();
- if (currrentState instanceof DataflowExecutionState) {
- currrentDataflowState = (DataflowExecutionState) currrentState;
- }
- // It's okay to pass in the null state, publish() handles and tests this.
- publish(currrentDataflowState, record);
+ private void initializeLabelMaps(PipelineOptions options) {
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ DataflowWorkerHarnessOptions harnessOptions =
options.as(DataflowWorkerHarnessOptions.class);
+ @Nullable String jobId = harnessOptions.getJobId();
+ if (jobId == null || jobId.isEmpty()) {
+ jobId = GceMetadataUtil.fetchDataflowJobId();
+ }
+ jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+ @Nullable String jobName = dataflowOptions.getJobName();
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = GceMetadataUtil.fetchDataflowJobName();
+ }
+ jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+ String region = dataflowOptions.getRegion();
+ if (region.isEmpty()) {
+ region = GceMetadataUtil.fetchDataflowRegion();
+ }
+ region = middleCrop(region, LABEL_MAX_LENGTH);
+
+ // Note that the id in the options is a string name, not the numeric VM id.
+ @Nullable String workerName = harnessOptions.getWorkerId();
+ if (workerName == null || workerName.isEmpty()) {
+ workerName = GceMetadataUtil.fetchDataflowWorkerName();
+ }
+ workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+ String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(),
LABEL_MAX_LENGTH);
+
+ @Nullable String projectId = harnessOptions.getProject();
+ if (projectId == null || projectId.isEmpty()) {
+ projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+ }
+ projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+ ImmutableMap.Builder<String, String> defaultLabelsBuilder = new
ImmutableMap.Builder<>();
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_type",
"instance");
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_name",
workerName);
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+ defaultLabels = defaultLabelsBuilder.buildOrThrow();
+
+ ImmutableMap.Builder<String, String> resourceLabelBuilder = new
ImmutableMap.Builder<>();
+ resourceLabelBuilder.put("job_id", jobId);
+ resourceLabelBuilder.put("job_name", jobName);
+ resourceLabelBuilder.put("project_id", projectId);
+ resourceLabelBuilder.put("region", region);
+ // We add the step when constructing the resource as it can change.
+ defaultResourceLabels = resourceLabelBuilder.buildOrThrow();
+ steplessMonitoredResource =
+ MonitoredResource.newBuilder(RESOURCE_TYPE)
+ .setLabels(defaultResourceLabels)
+ .addLabel(STEP_RESOURCE_LABEL, "")
+ .build();
+ }
+
+ private static String middleCrop(String value, int maxSize) {
Review Comment:
can we use org.apache.commons.lang3.StringUtils.abbreviateMiddle instead?
##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowWorkerLoggingOptions.java:
##########
@@ -121,6 +153,48 @@ enum Level {
void setWorkerLogLevelOverrides(WorkerLogLevelOverrides value);
+ /**
+ * This option controls the direct log levels for specifically named
loggers. If a message is
+ * configured to be sent to both directly to cloud logging and default
disk-based logging it will
+ * just be sent to disk-based logging. If an override only exists for a
logger for direct logging,
+ * the --defaultWorkerLogLevel will be used for the non-direct configuration
for the logger.
+ *
+ * <p>Later options with equivalent names override earlier options.
+ *
+ * <p>See {@link WorkerLogLevelOverrides} for more information on how to
configure logging on a
+ * per {@link Class}, {@link Package}, or name basis. If used from the
command line, the expected
+ * format is {"Name":"Level",...}, further details on {@link
WorkerLogLevelOverrides#from}.
+ */
+ @Description(
+ "This option controls the direct log levels for specifically named
loggers. "
+ + "The expected format is {\"Name\":\"Level\",...}. The Dataflow
worker supports a logging "
+ + "hierarchy based off of names that are '.' separated. For example,
by specifying the value "
+ + "{\"a.b.c.Foo\":\"DEBUG\"}, the logger for the class 'a.b.c.Foo'
will be configured to "
+ + "output logs at the DEBUG level. Similarly, by specifying the
value {\"a.b.c\":\"WARN\"}, "
+ + "all loggers underneath the 'a.b.c' package will be configured to
output logs at the WARN "
+ + "level. System.out and System.err levels are configured via
loggers of the corresponding "
+ + "name. Also, note that when multiple overrides are specified, the
exact name followed by "
+ + "the closest parent takes precedence. Note that if an override is
just provided for the direct log level "
+ + "for a logger, the default non-direct log level will be used for
non-direct logs.")
+ WorkerLogLevelOverrides getWorkerDirectLogLevelOverrides();
+
+ void setWorkerDirectLogLevelOverrides(WorkerLogLevelOverrides value);
Review Comment:
do we need the ability to configure direct vs disk at the logger + logLevel
level?
I feel the additional complexity is making it hard to understand how the
different flags `getWorkerLogLevelOverrides`, `getSdkHarnessLogLevelOverrides`,
`getWorkerDirectLogLevelOverrides` interact with each other and when to use
what.
Can we make changing the logLevels orthogonal to changing disk vs direct?
A boolen flag that turns on direct mode + the fallback to disk mode on
errors turned on by default.
=====
It is not clear to me if a user will want to send logs with different
logLevels from the same logger via two different paths. The disk path likely
will be delayed than the direct path and when looking at logs in log explorer,
the forwarding delay will introduce gaps (newer logs appearing without older
logs) in logs that can confuse user.
Enabling direct logging at the logger level could be useful, but it still
can produce gaps across different loggers in log explorer.
Since all logs are going the same destination, I think a boolean flag
enabling or disabling direct logging is simpler with lesser gotchas.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -33,71 +51,145 @@
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.EnumMap;
+import java.util.Enumeration;
+import java.util.HashMap;
import java.util.Map;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.logging.ErrorManager;
import java.util.logging.Handler;
+import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.SimpleFormatter;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import
org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerLoggingOptions;
import
org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.GceMetadataUtil;
+import org.apache.beam.sdk.options.PipelineOptions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CountingOutputStream;
import org.slf4j.MDC;
/**
* Formats {@link LogRecord} into JSON format for Cloud Logging. Any exception
is represented using
* {@link Throwable#printStackTrace()}.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
+@Internal
+@SuppressWarnings("method.invocation")
public class DataflowWorkerLoggingHandler extends Handler {
- private static final EnumMap<BeamFnApi.LogEntry.Severity.Enum, String>
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL;
-
- static {
- BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL = new
EnumMap<>(BeamFnApi.LogEntry.Severity.Enum.class);
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "TRACE"
- // onto "DEBUG" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.TRACE,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.DEBUG,
"DEBUG");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.INFO,
"INFO");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.NOTICE,
"NOTICE");
- // Note that Google Cloud Logging only defines a fixed number of
severities and maps "WARN" onto
- // "WARNING" as seen here:
https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud
- //
/blob/8a3ba9d085702c13b4f203812ee5dffdaf99572a/lib/fluent/plugin/out_google_cloud.rb#L865
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.WARN,
"WARNING");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.ERROR,
"ERROR");
-
BEAM_LOG_LEVEL_TO_CLOUD_LOG_LEVEL.put(BeamFnApi.LogEntry.Severity.Enum.CRITICAL,
"CRITICAL");
- }
-
/**
* Buffer size to use when writing logs. This matches <a
* href="https://cloud.google.com/logging/quotas#log-limits">Logging usage
limits</a> to avoid
* spreading the same log entry across multiple disk flushes.
*/
private static final int LOGGING_WRITER_BUFFER_SIZE = 262144; // 256kb
+ // Used as a side-channel for a Logger for which the configured non-direct
logging level doesn't
+ // match the default
+ // logging level and which is using direct logging.
+ private static class DirectHintResourceBundle extends ResourceBundle {
+ private static final String LEVEL_KEY = "NonDirectLogLevel";
+ private final Level nonDirectLogLevel;
+
+ DirectHintResourceBundle(Level directLevel) {
+ this.nonDirectLogLevel = directLevel;
+ }
+
+ @Override
+ public String getBaseBundleName() {
+ return "DataflowWorkerLoggingHandler";
+ }
+
+ @Override
+ protected Object handleGetObject(@Nonnull String s) {
+ if (LEVEL_KEY.equals(s)) {
+ return nonDirectLogLevel;
+ }
+ return new MissingResourceException(
+ "The only valid key is " + LEVEL_KEY, this.getClass().getName(), s);
+ }
+
+ @Override
+ public @Nonnull Enumeration<String> getKeys() {
+ return Iterators.asEnumeration(Iterators.singletonIterator(LEVEL_KEY));
+ }
+ }
+ // Since there are just a couple possible levels, we cache them.
+ private static final ConcurrentHashMap<Level, ResourceBundle>
resourceBundles =
+ new ConcurrentHashMap<>();
+
+ @VisibleForTesting
+ static ResourceBundle resourceBundleForNonDirectLogLevelHint(Level
nonDirectLogLevel) {
+ return resourceBundles.computeIfAbsent(nonDirectLogLevel,
DirectHintResourceBundle::new);
+ }
+
/** If true, add SLF4J MDC to custom_data of the log message. */
+ @LazyInit private boolean logCustomMdc = false;
+
+ // All of the direct logging related fields are only initialized if
enableDirectLogging is called.
+ // Afterwards they
+ // are logically final.
+ @LazyInit private @Nullable Logging directLogging = null;
+ @LazyInit private boolean fallbackDirectErrorsToDisk = false;
+ @LazyInit private Level defaultNonDirectLogLevel = Level.ALL;
+ @LazyInit private Logging.WriteOption[] directWriteOptions = new
Logging.WriteOption[0];
+ @LazyInit private ImmutableMap<String, String> defaultResourceLabels =
ImmutableMap.of();
+
+ @LazyInit
+ private MonitoredResource steplessMonitoredResource =
+ MonitoredResource.newBuilder(RESOURCE_TYPE).build();
+
+ @LazyInit private ImmutableMap<String, String> defaultLabels =
ImmutableMap.of();
+ @LazyInit private @Nullable Consumer<LogEntry> testDirectLogInterceptor;
+
+ private static final String LOG_TYPE = "dataflow.googleapis.com%2Fworker";
Review Comment:
should this be "/" instead of "%2f"?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingHandler.java:
##########
@@ -131,28 +222,315 @@ public DataflowWorkerLoggingHandler(String filename,
long sizeLimit) throws IOEx
createOutputStream();
}
- public synchronized void setLogMdc(boolean enabled) {
- this.logCustomMdc = enabled;
+ public void setLogMdc(boolean enabled) {
+ logCustomMdc = enabled;
}
- @Override
- public synchronized void publish(LogRecord record) {
- DataflowExecutionState currrentDataflowState = null;
- ExecutionState currrentState =
ExecutionStateTracker.getCurrentExecutionState();
- if (currrentState instanceof DataflowExecutionState) {
- currrentDataflowState = (DataflowExecutionState) currrentState;
- }
- // It's okay to pass in the null state, publish() handles and tests this.
- publish(currrentDataflowState, record);
+ private void initializeLabelMaps(PipelineOptions options) {
+ DataflowPipelineOptions dataflowOptions =
options.as(DataflowPipelineOptions.class);
+ DataflowWorkerHarnessOptions harnessOptions =
options.as(DataflowWorkerHarnessOptions.class);
+ @Nullable String jobId = harnessOptions.getJobId();
+ if (jobId == null || jobId.isEmpty()) {
+ jobId = GceMetadataUtil.fetchDataflowJobId();
+ }
+ jobId = middleCrop(jobId, LABEL_MAX_LENGTH);
+
+ @Nullable String jobName = dataflowOptions.getJobName();
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = GceMetadataUtil.fetchDataflowJobName();
+ }
+ jobName = middleCrop(jobName, LABEL_MAX_LENGTH);
+
+ String region = dataflowOptions.getRegion();
+ if (region.isEmpty()) {
+ region = GceMetadataUtil.fetchDataflowRegion();
+ }
+ region = middleCrop(region, LABEL_MAX_LENGTH);
+
+ // Note that the id in the options is a string name, not the numeric VM id.
+ @Nullable String workerName = harnessOptions.getWorkerId();
+ if (workerName == null || workerName.isEmpty()) {
+ workerName = GceMetadataUtil.fetchDataflowWorkerName();
+ }
+ workerName = middleCrop(workerName, LABEL_MAX_LENGTH);
+
+ String workerId = middleCrop(GceMetadataUtil.fetchDataflowWorkerId(),
LABEL_MAX_LENGTH);
+
+ @Nullable String projectId = harnessOptions.getProject();
+ if (projectId == null || projectId.isEmpty()) {
+ projectId = checkNotNull(ServiceOptions.getDefaultProjectId());
+ }
+ projectId = middleCrop(projectId, LABEL_MAX_LENGTH);
+
+ ImmutableMap.Builder<String, String> defaultLabelsBuilder = new
ImmutableMap.Builder<>();
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_type",
"instance");
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_name",
workerName);
+ defaultLabelsBuilder.put("compute.googleapis.com/resource_id", workerId);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/region", region);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_name", jobName);
+ defaultLabelsBuilder.put("dataflow.googleapis.com/job_id", jobId);
+ defaultLabels = defaultLabelsBuilder.buildOrThrow();
+
+ ImmutableMap.Builder<String, String> resourceLabelBuilder = new
ImmutableMap.Builder<>();
+ resourceLabelBuilder.put("job_id", jobId);
+ resourceLabelBuilder.put("job_name", jobName);
+ resourceLabelBuilder.put("project_id", projectId);
+ resourceLabelBuilder.put("region", region);
+ // We add the step when constructing the resource as it can change.
+ defaultResourceLabels = resourceLabelBuilder.buildOrThrow();
+ steplessMonitoredResource =
+ MonitoredResource.newBuilder(RESOURCE_TYPE)
+ .setLabels(defaultResourceLabels)
+ .addLabel(STEP_RESOURCE_LABEL, "")
+ .build();
+ }
+
+ private static String middleCrop(String value, int maxSize) {
+ if (value.length() <= maxSize) {
+ return value;
+ }
+ if (maxSize < 3) {
+ return value.substring(0, maxSize);
+ }
+ int firstHalfSize = (maxSize - 2) / 2;
+ int secondHalfSize = (maxSize - 3) / 2;
+ return value.substring(0, firstHalfSize)
+ + "..."
+ + value.substring(value.length() - secondHalfSize);
}
- public synchronized void publish(DataflowExecutionState
currentExecutionState, LogRecord record) {
- if (!isLoggable(record)) {
+ private static Severity severityFor(Level level) {
+ if (level instanceof LoggingLevel) {
+ return ((LoggingLevel) level).getSeverity();
+ }
+ // Choose the severity based on Level range, rounding down.
+ // The assumption is that Level values below maintain same numeric value
+ int value = level.intValue();
+ if (value < Level.INFO.intValue()) {
+ // Includes Level.CONFIG, Level.FINE, Level.FINER, Level.FINEST
+ return Severity.DEBUG;
+ } else if (value < Level.WARNING.intValue()) {
+ return Severity.INFO;
+ } else if (value < Level.SEVERE.intValue()) {
+ return Severity.WARNING;
+ } else if (value < Level.OFF.intValue()) {
+ return Severity.ERROR;
+ }
+ // Level.OFF is special meaning not to log.
+ return Severity.NONE;
+ }
+
+ private void addLogField(
+ Struct.Builder builder, String field, @Nullable String value, int
maxSize) {
+ if (value == null || value.isEmpty()) {
return;
}
+ builder.putFieldsBuilderIfAbsent(field).setStringValue(middleCrop(value,
maxSize));
+ }
+
+ @VisibleForTesting
+ LogEntry constructDirectLogEntry(
+ LogRecord record, @Nullable DataflowExecutionState executionState) {
+ Struct.Builder payloadBuilder = Struct.newBuilder();
+ addLogField(payloadBuilder, "logger", record.getLoggerName(),
FIELD_MAX_LENGTH);
+ addLogField(
+ payloadBuilder, "message", getFormatter().formatMessage(record),
MESSAGE_MAX_LENGTH);
+ addLogField(
+ payloadBuilder, "exception", formatException(record.getThrown()),
MESSAGE_MAX_LENGTH);
+ addLogField(payloadBuilder, "thread",
String.valueOf(record.getThreadID()), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "stage",
DataflowWorkerLoggingMDC.getStageName(), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "worker",
DataflowWorkerLoggingMDC.getWorkerId(), FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "work", DataflowWorkerLoggingMDC.getWorkId(),
FIELD_MAX_LENGTH);
+ addLogField(payloadBuilder, "job", DataflowWorkerLoggingMDC.getJobId(),
FIELD_MAX_LENGTH);
+ if (logCustomMdc) {
+ @Nullable Map<String, String> mdcMap = MDC.getCopyOfContextMap();
+ if (mdcMap != null && !mdcMap.isEmpty()) {
+ Struct.Builder customDataBuilder =
+
payloadBuilder.putFieldsBuilderIfAbsent("custom_data").getStructValueBuilder();
+ mdcMap.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey())
+ .forEach(
+ (entry) -> {
+ if (entry.getKey() != null) {
+ addLogField(
+ customDataBuilder, entry.getKey(), entry.getValue(),
FIELD_MAX_LENGTH);
+ }
+ });
+ }
+ }
+
+ @Nullable String stepId = null;
+ if (executionState != null) {
+ @Nullable NameContext nameContext = executionState.getStepName();
+ if (nameContext != null) {
+ stepId = nameContext.userName();
+ }
+ }
+ addLogField(payloadBuilder, "step", stepId, FIELD_MAX_LENGTH);
+
+ LogEntry.Builder builder =
+ LogEntry.newBuilder(Payload.JsonPayload.of(payloadBuilder.build()))
+ .setTimestamp(Instant.ofEpochMilli(record.getMillis()))
+ .setSeverity(severityFor(record.getLevel()));
+
+ if (stepId != null) {
+ builder.setResource(
+ MonitoredResource.newBuilder(RESOURCE_TYPE)
+ .setLabels(defaultResourceLabels)
+ .addLabel(STEP_RESOURCE_LABEL, stepId)
+ .build());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Enables logging of records directly to Cloud Logging instead of logging
to the disk. Should
+ * only be called once.
+ *
+ * @param pipelineOptions the pipelineOptions, used to configure buffers etc.
+ * @param defaultNonDirectLogLevel the default level at which logs should be
logged to disk.
+ * LogEntries that are below this level will be logged directly to Cloud
Logging. The behavior
+ * for a specific LogEntry can be overridden by attaching a
ResourceBundle obtained from
+ * resourceBundleForNonDirectLogLevelHint to it.
+ */
+ synchronized void enableDirectLogging(
+ PipelineOptions pipelineOptions,
+ Level defaultNonDirectLogLevel,
+ @Nullable Consumer<LogEntry> testDirectLogInterceptor) {
+ checkState(
+ directLogging == null && this.testDirectLogInterceptor == null,
+ "enableDirectLogging should only be called once on a
DataflowWorkerLoggingHandler");
+ initializeLabelMaps(pipelineOptions);
+
+ DataflowWorkerLoggingOptions dfLoggingOptions =
+ pipelineOptions.as(DataflowWorkerLoggingOptions.class);
+ this.fallbackDirectErrorsToDisk =
+
Boolean.TRUE.equals(dfLoggingOptions.getDirectLoggingFallbackToDiskOnErrors());
+ directThrottler.setCooldownDuration(
+
Duration.ofSeconds(dfLoggingOptions.getDirectLoggingCooldownSeconds()));
+
+ if (testDirectLogInterceptor == null) {
+ // Override some of the default settings.
+ LoggingOptions cloudLoggingOptions =
+ LoggingOptions.newBuilder()
+ .setBatchingSettings(
+ BatchingSettings.newBuilder()
+ .setFlowControlSettings(
+ FlowControlSettings.newBuilder()
+ .setLimitExceededBehavior(
+
FlowController.LimitExceededBehavior.ThrowException)
+ .setMaxOutstandingRequestBytes(
+
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit())
+ .setMaxOutstandingElementCount(
+
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit())
+ .build())
+ // These thresholds match the default settings in
+ // LoggingServiceV2StubSettings.java for
writeLogEntries. We must re-set them
+ // when we override the flow control settings because
otherwise
+ // BatchingSettings defaults to no batching.
+ .setElementCountThreshold(
+ Math.min(
+ 1000L,
+ Math.max(
+ 1L,
+
dfLoggingOptions.getWorkerDirectLoggerBufferElementLimit() / 2)))
+ .setRequestByteThreshold(
+ Math.min(
+ 1048576L,
+ Math.max(
+ 1,
dfLoggingOptions.getWorkerDirectLoggerBufferByteLimit() / 2)))
+ .setDelayThresholdDuration(Duration.ofMillis(50L))
+ .build())
+ .build();
+ ArrayList<Logging.WriteOption> writeOptions = new ArrayList<>();
+ writeOptions.add(Logging.WriteOption.labels(defaultLabels));
+ writeOptions.add(Logging.WriteOption.logName(LOG_TYPE));
+
writeOptions.add(Logging.WriteOption.resource(steplessMonitoredResource));
+ this.directWriteOptions = Iterables.toArray(writeOptions,
Logging.WriteOption.class);
+
+ Logging directLogging = cloudLoggingOptions.getService();
+ directLogging.setFlushSeverity(Severity.NONE);
+ directLogging.setWriteSynchronicity(Synchronicity.ASYNC);
+
+ this.directLogging = directLogging;
+ } else {
+ this.testDirectLogInterceptor = testDirectLogInterceptor;
+ }
+ this.defaultNonDirectLogLevel = defaultNonDirectLogLevel;
+ }
+
+ private static @Nullable DataflowExecutionState
getCurrentDataflowExecutionState() {
+ @Nullable ExecutionState currentState =
ExecutionStateTracker.getCurrentExecutionState();
+ if (!(currentState instanceof DataflowExecutionState)) {
+ return null;
+ }
+ return (DataflowExecutionState) currentState;
+ }
+
+ @Override
+ public void publish(@Nullable LogRecord record) {
+ if (record == null) {
+ return;
+ }
+ publish(getCurrentDataflowExecutionState(), record);
+ }
+
+ public void publish(@Nullable DataflowExecutionState executionState,
LogRecord record) {
+ boolean isDirectLog = isConfiguredDirectLog(record);
+ if (isDirectLog) {
+ if (directThrottler.shouldAttemptDirectLog()) {
+ try {
+ LogEntry logEntry = constructDirectLogEntry(record, executionState);
+ if (testDirectLogInterceptor != null) {
+ // The default labels are applied by write options generally bute
we merge them in here
+ // so they are visible to the test.
+ HashMap<String, String> mergedLabels = new
HashMap<>(defaultLabels);
+ mergedLabels.putAll(logEntry.getLabels());
+ logEntry = logEntry.toBuilder().setLabels(mergedLabels).build();
+ if (logEntry.getResource() == null) {
+ logEntry =
logEntry.toBuilder().setResource(steplessMonitoredResource).build();
+ }
+ checkNotNull(testDirectLogInterceptor).accept(logEntry);
+ } else {
+ checkNotNull(directLogging).write(ImmutableList.of(logEntry),
directWriteOptions);
+ }
+ directThrottler.noteDirectLoggingEnqueueSuccess();
+ return;
+ } catch (RuntimeException e) {
+ @Nullable String log =
directThrottler.noteDirectLoggingEnqueueFailure();
+ if (log != null) {
+ printErrorToDisk(log, e);
+ }
+ }
+ }
+
+ // Either we were throttled or encountered an error enqueuing the log.
+ if (!fallbackDirectErrorsToDisk) {
+ return;
Review Comment:
Would be useful to add a throttled disk log here saying we are dropping
direct logs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]