[
https://issues.apache.org/jira/browse/BEAM-14539?focusedWorklogId=776984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-776984
]
ASF GitHub Bot logged work on BEAM-14539:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Jun/22 16:11
Start Date: 01/Jun/22 16:11
Worklog Time Spent: 10m
Work Description: lukecwik commented on code in PR #17787:
URL: https://github.com/apache/beam/pull/17787#discussion_r886989139
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java:
##########
@@ -134,6 +136,44 @@ public void testLogRawBytes() {
assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg));
}
+ @Test
+ public void testLogRawBytesLarge() {
+ PrintStream printStream = createPrintStreamAdapter();
+ String msg = "♠ ♡ ♢ ♣ ♤ ♥ ♦ ♧";
+ for (int i = 0; i < 10; ++i) {
+ msg = msg + msg;
+ }
+ byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+ printStream.write(bytes, 0, 1);
+ printStream.write(bytes, 1, 4);
+ printStream.write(bytes, 5, 15);
+ assertThat(handler.getLogs(), is(empty()));
+
+ // We expect that when the buffer is full we flush
+ printStream.write(bytes, 20, 1000);
+ printStream.write(bytes, 1020, 1000);
+ printStream.write(bytes, 2020, 1000);
+ int numLogs = handler.getLogs().size();
+ assertThat(handler.getLogs(), is(not(empty())));
+
+ // We expect that when we decode large messages we flush as well
+ printStream.write(bytes, 3020, bytes.length - 3020);
+ assertThat(numLogs, is(lessThan(handler.getLogs().size())));
+ numLogs = handler.getLogs().size();
+
+ // We expect new lines to cause a flush
+ String newlineMsg = "♠ ♡ \n♦ ♧";
+ byte[] newlineMsgBytes = newlineMsg.getBytes(StandardCharsets.UTF_8);
+ printStream.write(newlineMsgBytes, 0, newlineMsgBytes.length);
+ assertThat(numLogs, is(lessThan(handler.getLogs().size())));
+
+ StringBuilder actualMessages = new StringBuilder();
+ for (LogRecord logRecord : handler.getLogs()) {
+ actualMessages.append(logRecord.getMessage());
+ }
+ assertThat(actualMessages.toString(), containsString(msg + newlineMsg));
Review Comment:
done, had to expose output warning since that is the prefix.
Issue Time Tracking
-------------------
Worklog Id: (was: 776984)
Time Spent: 1h (was: 50m)
> JulHandlerPrintStream failing to buffer carry over bytes
> --------------------------------------------------------
>
> Key: BEAM-14539
> URL: https://issues.apache.org/jira/browse/BEAM-14539
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow
> Affects Versions: 2.33.0, 2.34.0, 2.35.0, 2.36.0, 2.37.0, 2.38.0, 2.39.0
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Fix For: 2.40.0
>
> Time Spent: 1h
> Remaining Estimate: 0h
>
> JulHandlerPrintStream was not flushing in a loop for large byte arrays which
> meant that the carry over could be much larger then the assumed 6 bytes.
> Saw these logs for a job:
> {noformat}
> java.lang.ArrayIndexOutOfBoundsException: Index 9 out of bounds for length 6
> at
> org.apache.beam.runners.dataflow.worker.logging.JulHandlerPrintStreamAdapterFactory$JulHandlerPrintStream.write(JulHandlerPrintStreamAdapterFactory.java:142)
> at java.base/java.io.PrintStream.write(PrintStream.java:559)
> at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
> at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
> at java.base/sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
> at
> java.base/java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:184)
> at java.base/java.io.PrintStream.write(PrintStream.java:606)
> at java.base/java.io.PrintStream.print(PrintStream.java:745)
> at java.base/java.io.PrintStream.append(PrintStream.java:1147)
> at java.base/java.io.PrintStream.append(PrintStream.java:1188)
> at java.base/java.io.PrintStream.append(PrintStream.java:63)
> at java.base/java.util.Formatter$FixedString.print(Formatter.java:2754)
> at java.base/java.util.Formatter.format(Formatter.java:2661)
> at java.base/java.io.PrintStream.format(PrintStream.java:1053)
> at java.base/java.io.PrintStream.printf(PrintStream.java:949)
> ...
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)