This is an automated email from the ASF dual-hosted git repository. iuliana pushed a commit to branch revert-1301-streams/stream-gobbler in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 0b0ec04460031d8f735ca1e9095113ceb3962b0c Author: Iuliana Cosmina <[email protected]> AuthorDate: Fri Feb 18 13:37:56 2022 +0000 Revert "Support UTF sequences in stream gobbler" --- .../apache/brooklyn/util/stream/StreamGobbler.java | 83 +++++++--------------- .../brooklyn/util/stream/StreamGobblerTest.java | 71 +++--------------- 2 files changed, 37 insertions(+), 117 deletions(-) diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/stream/StreamGobbler.java b/utils/common/src/main/java/org/apache/brooklyn/util/stream/StreamGobbler.java index 87b1838..6053fa0 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/stream/StreamGobbler.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/stream/StreamGobbler.java @@ -18,21 +18,22 @@ */ package org.apache.brooklyn.util.stream; -import org.slf4j.Logger; - -import java.io.*; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; import java.util.concurrent.atomic.AtomicBoolean; -public class StreamGobbler extends Thread implements Closeable { +import org.slf4j.Logger; - private static final char REPLACEMENT_CHARACTER = 0xfffd; +public class StreamGobbler extends Thread implements Closeable { + protected final InputStream stream; protected final PrintStream out; protected final Logger log; private final AtomicBoolean running = new AtomicBoolean(true); - + public StreamGobbler(InputStream stream, OutputStream out, Logger log) { this(stream, out != null ? new PrintStream(out) : null, log); } @@ -42,7 +43,7 @@ public class StreamGobbler extends Thread implements Closeable { this.out = out; this.log = log; } - + @Override public void close() { running.set(false); @@ -59,56 +60,26 @@ public class StreamGobbler extends Thread implements Closeable { String logPrefix = ""; String printPrefix = ""; - public StreamGobbler setPrefix(String prefix) { setLogPrefix(prefix); setPrintPrefix(prefix); return this; } - public StreamGobbler setPrintPrefix(String prefix) { printPrefix = prefix; return this; } - public StreamGobbler setLogPrefix(String prefix) { logPrefix = prefix; return this; - } - + } + @Override public void run() { - int c, bytes = 0; - char[] utfSymbol = new char[2]; + int c = -1; try { - ByteBuffer bb = ByteBuffer.allocate(4); - while (running.get() && (c = stream.read()) >= 0) { - - if (bytes == 0) { - // Identify utf symbol size by Unicode page. - if (c >= 0xF0) { - bytes = 4; - } else if (c >= 0xE0) { - bytes = 3; - } else if (c >= 0xC2) { - bytes = 2; - } else { - bytes = 1; - } - } - - bb.put((byte) c); - bytes--; - - if (bytes == 0) { - bb.rewind(); - StandardCharsets.UTF_8.decode(bb).get(utfSymbol); - bb.clear(); - onChar(utfSymbol[0]); - if (utfSymbol[1] != 0) { - onChar(utfSymbol[1]); - } - } + while (running.get() && (c=stream.read())>=0) { + onChar(c); } onClose(); } catch (IOException e) { @@ -116,25 +87,23 @@ public class StreamGobbler extends Thread implements Closeable { //TODO parametrise log level, for this error, and for normal messages if (log!=null && log.isTraceEnabled()) log.trace(logPrefix+"exception reading from stream ("+e+")"); } finally { - if (out!=null) out.flush(); + if (out != null) out.flush(); } } - + private final StringBuilder lineSoFar = new StringBuilder(16); - - public void onChar(char c) { - if (c == REPLACEMENT_CHARACTER) return; - if (c == '\n' || c == '\r') { - if (lineSoFar.length() > 0) - // suppress blank lines, so that we can treat either newline char as a line separator - // (e.g. to show curl updates frequently) + public void onChar(int c) { + if (c=='\n' || c=='\r') { + if (lineSoFar.length()>0) + //suppress blank lines, so that we can treat either newline char as a line separator + //(eg to show curl updates frequently) onLine(lineSoFar.toString()); lineSoFar.setLength(0); } else { - lineSoFar.append(c); + lineSoFar.append((char)c); } } - + public void onLine(String line) { //right trim, in case there is \r or other funnies while (line.length()>0 && Character.isWhitespace(line.charAt(line.length()-1))) @@ -147,7 +116,7 @@ public class StreamGobbler extends Thread implements Closeable { if (log!=null && log.isDebugEnabled()) log.debug(logPrefix+line); } } - + public void onClose() { onLine(lineSoFar.toString()); if (out!=null) out.flush(); @@ -155,7 +124,7 @@ public class StreamGobbler extends Thread implements Closeable { finished = true; synchronized (this) { notifyAll(); } } - + private volatile boolean finished = false; /** convenience -- equivalent to calling join() */ diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/stream/StreamGobblerTest.java b/utils/common/src/test/java/org/apache/brooklyn/util/stream/StreamGobblerTest.java index 2438a6d..3ace7be 100644 --- a/utils/common/src/test/java/org/apache/brooklyn/util/stream/StreamGobblerTest.java +++ b/utils/common/src/test/java/org/apache/brooklyn/util/stream/StreamGobblerTest.java @@ -18,71 +18,22 @@ */ package org.apache.brooklyn.util.stream; -import org.apache.brooklyn.test.Asserts; -import org.apache.brooklyn.util.os.Os; -import org.apache.brooklyn.util.text.Strings; -import org.apache.commons.lang3.RandomStringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.*; - import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -public class StreamGobblerTest { - - private static final Logger LOG = LoggerFactory.getLogger(StreamGobblerTest.class); - - private final String NL = Os.LINE_SEPARATOR; - - private void testStreamGobbler(String text) throws Exception { - LOG.info("Processing text: '{}'", text); - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayInputStream in = new ByteArrayInputStream(text.getBytes()); - StreamGobbler streamGobbler = new StreamGobbler(in, out, (Logger) null); - streamGobbler.start(); - streamGobbler.join(5000); - streamGobbler.close(); - out.close(); - - String expected = Strings.isBlank(text) ? "" : text.replace("\t\r","\r").replace("\r","\n") + NL; - Assert.assertEquals(out.toString(), expected); - } - - @Test - public void testUnicodeString() throws Exception { - - // empty - testStreamGobbler(""); - - // single chars - testStreamGobbler(" "); // 1 byte char - testStreamGobbler("ß"); // 2 bytes char - testStreamGobbler("√"); // 3 bytes char - testStreamGobbler("𑱣"); // 4 bytes char - - // duplicate chars - testStreamGobbler(" "); // 2 x (1 byte char) - testStreamGobbler("ßß"); // 2 x (2 bytes char) - testStreamGobbler("√√"); // 2 x (3 bytes char) - testStreamGobbler("𑱣𑱣"); // 2 x (4 bytes char) +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; - // mixed text - testStreamGobbler("옖ʧ"); - testStreamGobbler("aßc√qq1!"); - testStreamGobbler("옖ʧ\t롬㟦密䕎孓"); - testStreamGobbler("їїх\rхфт шф9в 0-ф"); - testStreamGobbler("їїх\t\rхфт шф9в 0-ф"); - testStreamGobbler("a ßßa√√aˆa©aƒa∫a˚\na˙a¬a∆a¥a®a†a. √"); - testStreamGobbler("娨∫√爈ø¨¨\0iubxo𑱣qpihbpπ∫ˆ¨¨øß†a"); - testStreamGobbler(" oubibosu√bfhf иіашвщ, гирф𑱣ііззфххіхіїїх. цйїхз/йї звохй отв 90320к4590е- †a"); +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.os.Os; +import org.apache.brooklyn.util.stream.StreamGobbler; +import org.testng.annotations.Test; - // random text - testStreamGobbler(RandomStringUtils.random(999)); - } +public class StreamGobblerTest { + private String NL = Os.LINE_SEPARATOR; @Test public void testGobbleStream() throws Exception {
