Repository: beam Updated Branches: refs/heads/master 2d3e9fe75 -> 49067b164
[BEAM-1347] Remove the usage of a thread local on a potentially hot path Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/60779e2e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/60779e2e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/60779e2e Branch: refs/heads/master Commit: 60779e2ecd76f1cb4766050e4560765c1bc3c19b Parents: 2d3e9fe Author: Luke Cwik <[email protected]> Authored: Tue May 30 13:15:31 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Tue May 30 14:15:23 2017 -0700 ---------------------------------------------------------------------- .../fn/harness/logging/BeamFnLoggingClient.java | 36 +++++++++++--------- 1 file changed, 19 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/60779e2e/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index c8d11ed..d56ee6d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -38,7 +38,6 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -179,11 +178,14 @@ public class BeamFnLoggingClient implements AutoCloseable { private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries = new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT); private final Future<?> bufferedLogWriter; - private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler; + /** + * Safe object publishing is not required since we only care if the thread that set + * this field is equal to the thread also attempting to add a log entry. + */ + private Thread logEntryHandlerThread; private LogRecordHandler(ExecutorService executorService) { bufferedLogWriter = executorService.submit(this); - logEntryHandler = new ThreadLocal<>(); } @Override @@ -204,19 +206,18 @@ public class BeamFnLoggingClient implements AutoCloseable { builder.setTrace(getStackTraceAsString(record.getThrown())); } // The thread that sends log records should never perform a blocking publish and - // only insert log records best effort. We detect which thread is logging - // by using the thread local, defaulting to the blocking publish. - MoreObjects.firstNonNull( - logEntryHandler.get(), this::blockingPublish).accept(builder.build()); - } - - /** Blocks caller till enough space exists to publish this log entry. */ - private void blockingPublish(BeamFnApi.LogEntry logEntry) { - try { - bufferedLogEntries.put(logEntry); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); + // only insert log records best effort. + if (Thread.currentThread() != logEntryHandlerThread) { + // Blocks caller till enough space exists to publish this log entry. + try { + bufferedLogEntries.put(builder.build()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } else { + // Never blocks caller, will drop log message if buffer is full. + bufferedLogEntries.offer(builder.build()); } } @@ -225,7 +226,8 @@ public class BeamFnLoggingClient implements AutoCloseable { // Logging which occurs in this thread will attempt to publish log entries into the // above handler which should never block if the queue is full otherwise // this thread will get stuck. - logEntryHandler.set(bufferedLogEntries::offer); + logEntryHandlerThread = Thread.currentThread(); + List<BeamFnApi.LogEntry> additionalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); try {
