[ 
https://issues.apache.org/jira/browse/BEAM-9399?focusedWorklogId=401712&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-401712
 ]

ASF GitHub Bot logged work on BEAM-9399:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Mar/20 19:28
            Start Date: 11/Mar/20 19:28
    Worklog Time Spent: 10m 
      Work Description: scwhittle commented on pull request #11096: [BEAM-9399] 
Change the redirection of System.err to be a custom PrintStream
URL: https://github.com/apache/beam/pull/11096#discussion_r391210501
 
 

 ##########
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java
 ##########
 @@ -37,114 +37,272 @@
 class JulHandlerPrintStreamAdapterFactory {
   private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
 
-  /**
-   * Creates a {@link PrintStream} which redirects all output to the JUL 
{@link Handler} with the
-   * specified {@code loggerName} and {@code level}.
-   */
-  static PrintStream create(Handler handler, String loggerName, Level 
messageLevel) {
-    try {
-      return new PrintStream(
-          new JulHandlerAdapterOutputStream(handler, loggerName, messageLevel),
-          false,
-          StandardCharsets.UTF_8.name());
-    } catch (UnsupportedEncodingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * An output stream adapter which is able to take a stream of UTF-8 data and 
output to a named JUL
-   * log handler. The log messages will be buffered until the system dependent 
new line separator is
-   * seen, at which point the buffered string will be output.
-   */
-  private static class JulHandlerAdapterOutputStream extends OutputStream {
+  private static class JulHandlerPrintStream extends PrintStream {
     private static final String LOGGING_DISCLAIMER =
         String.format(
             "Please use a logger instead of System.out or System.err.%n"
                 + "Please switch to using org.slf4j.Logger.%n"
                 + "See: https://cloud.google.com/dataflow/pipelines/logging";);
-    // This limits the number of bytes which we buffer in case we don't see a 
newline character.
-    private static final int BUFFER_LIMIT = 1 << 14; // 16384 bytes
-    private static final byte[] NEW_LINE = 
System.lineSeparator().getBytes(StandardCharsets.UTF_8);
+    // This limits the number of bytes which we buffer in case we don't have a 
flush.
+    private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
 
     /** Hold reference of named logger to check configured {@link Level}. */
     private Logger logger;
 
     private Handler handler;
     private String loggerName;
-    private ByteArrayOutputStream baos;
+    private StringBuilder buffer;
     private Level messageLevel;
-    private int matched = 0;
 
-    private JulHandlerAdapterOutputStream(Handler handler, String loggerName, 
Level logLevel) {
+    private JulHandlerPrintStream(Handler handler, String loggerName, Level 
logLevel) {
+      super(
+          new OutputStream() {
+            @Override
+            public void write(int i) throws IOException {
+              throw new RuntimeException("All methods should be overwritten so 
this is unused");
+            }
+          });
       this.handler = handler;
       this.loggerName = loggerName;
       this.messageLevel = logLevel;
       this.logger = Logger.getLogger(loggerName);
-      this.baos = new ByteArrayOutputStream(BUFFER_LIMIT);
+      this.buffer = new StringBuilder(BUFFER_LIMIT);
     }
 
     @Override
-    public void write(int b) {
-      if (outputWarning.compareAndSet(false, true)) {
-        publish(Level.WARNING, LOGGING_DISCLAIMER);
+    public void flush() {
+      publish(flushToString());
+    }
+
+    private synchronized String flushToString() {
+      if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\n') {
+        buffer.setLength(buffer.length() - 1);
       }
+      String result = buffer.toString();
+      buffer.setLength(0);
+      return result;
+    }
 
-      baos.write(b);
-      // Check to see if the next byte matches further into new line string.
-      if (NEW_LINE[matched] == b) {
-        matched += 1;
-        // If we have matched the entire new line, output the contents of the 
buffer.
-        if (matched == NEW_LINE.length) {
-          output();
-        }
-      } else {
-        // Reset the match
-        matched = 0;
+    @Override
+    public void close() {
+      flush();
+    }
+
+    @Override
+    public boolean checkError() {
+      return false;
+    }
+
+    @Override
+    public synchronized void write(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public void write(byte[] a, int start, int limit) {
+      // XXX this enforces decoding on boundaries where before it didn't, does 
that matter?
+      print(new String(a, start, limit, Charset.defaultCharset()));
+    }
+
+    @Override
+    public synchronized void print(boolean b) {
+      buffer.append(b ? "true" : "false");
+    }
+
+    @Override
+    public synchronized void print(char c) {
+      buffer.append(c);
+    }
+
+    @Override
+    public synchronized void print(int i) {
+      buffer.append(i);
+    }
+
+    @Override
+    public synchronized void print(long l) {
+      buffer.append(l);
+    }
+
+    @Override
+    public synchronized void print(float f) {
+      buffer.append(f);
+    }
+
+    @Override
+    public synchronized void print(double d) {
+      buffer.append(d);
+    }
+
+    @Override
+    public synchronized void print(char[] a) {
+      buffer.append(a);
+    }
+
+    @Override
+    public synchronized void print(String s) {
+      buffer.append(s);
+    }
+
+    @Override
+    public synchronized void print(Object o) {
+      buffer.append(o);
+    }
+
+    @Override
+    public void println() {
+      flush();
+    }
+
+    @Override
+    public void println(boolean b) {
+      String msg;
+      synchronized (this) {
+        buffer.append(b);
+        msg = flushToString();
       }
-      if (baos.size() == BUFFER_LIMIT) {
-        output();
+      publish(msg);
+    }
+
+    @Override
+    public void println(char c) {
+      String msg;
+      synchronized (this) {
+        buffer.append(c);
+        msg = flushToString();
       }
+      publish(msg);
     }
 
     @Override
-    public void flush() throws IOException {
-      output();
+    public void println(int i) {
+      String msg;
+      synchronized (this) {
+        buffer.append(i);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
     @Override
-    public void close() throws IOException {
-      output();
+    public void println(long l) {
+      String msg;
+      synchronized (this) {
+        buffer.append(l);
+        msg = flushToString();
+      }
+      publish(msg);
     }
 
-    private void output() {
-      // If nothing was output, do not log anything
-      if (baos.size() == 0) {
-        return;
+    @Override
+    public void println(float f) {
+      String msg;
+      synchronized (this) {
+        buffer.append(f);
+        msg = flushToString();
       }
-      try {
-        String message = baos.toString(StandardCharsets.UTF_8.name());
-        // Strip the new line if it exists
-        if (message.endsWith(System.lineSeparator())) {
-          message = message.substring(0, message.length() - 
System.lineSeparator().length());
-        }
+      publish(msg);
+    }
+
+    @Override
+    public void println(double d) {
+      String msg;
+      synchronized (this) {
+        buffer.append(d);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(char[] a) {
+      String msg;
+      synchronized (this) {
+        buffer.append(a);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
+
+    @Override
+    public void println(String s) {
+      String msg;
+      synchronized (this) {
+        buffer.append(s);
+        msg = flushToString();
+      }
+      publish(msg);
+    }
 
-        publish(messageLevel, message);
-      } catch (UnsupportedEncodingException e) {
-        publish(
-            Level.SEVERE, String.format("Unable to decode string output to 
stdout/stderr %s", e));
+    @Override
+    public void println(Object o) {
+      String msg;
+      synchronized (this) {
+        buffer.append(o);
+        msg = flushToString();
       }
-      matched = 0;
-      baos.reset();
+      publish(msg);
     }
 
-    private void publish(Level level, String message) {
-      if (logger.isLoggable(level)) {
-        LogRecord log = new LogRecord(level, message);
+    @Override
+    public PrintStream format(String format, Object... args) {
+      return format(Locale.getDefault(), format, args);
+    }
+
+    @Override
+    public PrintStream format(Locale locale, String format, Object... args) {
+      String flushed;
+      int newlineIndex;
+      synchronized (this) {
+        int startLength = buffer.length();
+        Formatter formatter = new Formatter(buffer, locale);
+        formatter.format(format, args);
+        newlineIndex = buffer.indexOf("\n", startLength);
+        if (newlineIndex < 0) {
+          return this;
+        }
+        flushed = flushToString();
+      }
+      while (newlineIndex > 0) {
+        publish(flushed.substring(0, newlineIndex));
+        flushed = flushed.substring(newlineIndex + 1);
+        newlineIndex = flushed.indexOf('\n');
+      }
+      publish(flushed);
+      return this;
+    }
+
+    @Override
+    public synchronized PrintStream append(CharSequence cs, int start, int 
limit) {
+      buffer.append(cs.subSequence(start, limit));
+      return this;
+    }
+
+    // Note to avoid a deadlock, publish may never be called synchronized. See 
BEAM-9399.
+    private void publish(Level messageLevel, String message) {
+      if (logger.isLoggable(messageLevel)) {
 
 Review comment:
   Done
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 401712)
    Time Spent: 2h 10m  (was: 2h)

> Possible deadlock between DataflowWorkerLoggingHandler and overridden 
> System.err PrintStream
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-9399
>                 URL: https://issues.apache.org/jira/browse/BEAM-9399
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>            Reporter: Sam Whittle
>            Assignee: Sam Whittle
>            Priority: Minor
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> When an exception is encountered in DataflowWorkerLoggingHandler the 
> ErrorManager is used to log the exception.  ErrorManager uses System.err 
> which is overridden to be a PrintStream that writes back into 
> DataflowWorkerLoggingHandler.
> This has the lock ordering DataflowWorkerLoggingHandler -> PrintStream.
> Other logging of System.err has the inverse lock ordering 
> PrintStream->DataflowWorkerLoggingHandler so there is potential for deadlock.
> This is one known cause of the inversion, but any other System.err logs from 
> inside DataflowWorkerLoggingHandler could cause the same issue.
> Proposed fix is to address low-hanging fruit of having ErrorManager output to 
> the original System.err.  A full fix would be to improve our override of 
> System.err to a PrintStream that can detect the locking inversion or possibly 
> we could use the PrintStream mutex in both cases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to