scwhittle commented on code in PR #17787:
URL: https://github.com/apache/beam/pull/17787#discussion_r886517818


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java:
##########
@@ -65,8 +65,7 @@ private static class JulHandlerPrintStream extends 
PrintStream {
     private final Level messageLevel;
     private final CharsetDecoder decoder;

Review Comment:
   would it be good to add annotations that these are guarded?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactory.java:
##########
@@ -130,46 +128,50 @@ public synchronized void write(int i) {
     @Override
     public void write(byte[] a, int offset, int length) {
       ByteBuffer incoming = ByteBuffer.wrap(a, offset, length);
+      assert incoming.hasArray();

Review Comment:
   could return early for length == 0, not obvious if this would hold in that 
case



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/logging/JulHandlerPrintStreamAdapterFactoryTest.java:
##########
@@ -134,6 +136,44 @@ public void testLogRawBytes() {
     assertThat(handler.getLogs(), hasLogItem(msg + newlineMsg));
   }
 
+  @Test
+  public void testLogRawBytesLarge() {
+    PrintStream printStream = createPrintStreamAdapter();
+    String msg = "♠ ♡ ♢ ♣ ♤ ♥ ♦ ♧";
+    for (int i = 0; i < 10; ++i) {
+      msg = msg + msg;
+    }
+    byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
+    printStream.write(bytes, 0, 1);
+    printStream.write(bytes, 1, 4);
+    printStream.write(bytes, 5, 15);
+    assertThat(handler.getLogs(), is(empty()));
+
+    // We expect that when the buffer is full we flush
+    printStream.write(bytes, 20, 1000);
+    printStream.write(bytes, 1020, 1000);
+    printStream.write(bytes, 2020, 1000);
+    int numLogs = handler.getLogs().size();
+    assertThat(handler.getLogs(), is(not(empty())));
+
+    // We expect that when we decode large messages we flush as well
+    printStream.write(bytes, 3020, bytes.length - 3020);
+    assertThat(numLogs, is(lessThan(handler.getLogs().size())));
+    numLogs = handler.getLogs().size();
+
+    // We expect new lines to cause a flush
+    String newlineMsg = "♠ ♡ \n♦ ♧";
+    byte[] newlineMsgBytes = newlineMsg.getBytes(StandardCharsets.UTF_8);
+    printStream.write(newlineMsgBytes, 0, newlineMsgBytes.length);
+    assertThat(numLogs, is(lessThan(handler.getLogs().size())));
+
+    StringBuilder actualMessages = new StringBuilder();
+    for (LogRecord logRecord : handler.getLogs()) {
+      actualMessages.append(logRecord.getMessage());
+    }
+    assertThat(actualMessages.toString(), containsString(msg + newlineMsg));

Review Comment:
   nit: could this be equals instead of containsString?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to