Repository: nifi Updated Branches: refs/heads/pr/333 [created] 9235a28f8
NIFI-1736 Move kafka.StreamScanner to nifi-utils. This closes #333 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9235a28f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9235a28f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9235a28f Branch: refs/heads/pr/333 Commit: 9235a28f82cece2eeb6de1b4767910d9b8bf8ddc Parents: 3adb45e Author: ijokarumawak <[email protected]> Authored: Thu Apr 7 14:56:41 2016 +0900 Committer: Oleg Zhurakousky <[email protected]> Committed: Thu Apr 7 07:37:28 2016 -0400 ---------------------------------------------------------------------- .../nifi/stream/io/util/StreamScanner.java | 164 +++++++++++++++++++ .../nifi/stream/io/util/StreamScannerTests.java | 130 +++++++++++++++ .../nifi/processors/kafka/KafkaPublisher.java | 5 +- .../nifi/processors/kafka/StreamScanner.java | 164 ------------------- .../processors/kafka/StreamScannerTests.java | 130 --------------- 5 files changed, 297 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java new file mode 100644 index 0000000..901f31a --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * + */ +public class StreamScanner { + + private final static int EOF = -1; + + private final InputStream is; + + private final byte[] delimiterBytes; + + private final int maxDataSize; + + private ByteBuffer buffer; + + private byte[] data; + + /** + * Constructs a new instance + * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. + */ + public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { + this(is, delimiterBytes, maxDataSize, 8192); + } + + /** + * Constructs a new instance + * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. + * @param initialBufferSize + * initial size of the buffer used to buffer {@link InputStream} + * or its parts (if delimiter is used) to create its byte[] + * representation. Must be positive integer. The buffer will grow + * automatically as needed up to the Integer.MAX_VALUE; + * + */ + public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { + this.is = new BufferedInputStream(is); + this.delimiterBytes = delimiterBytes; + this.buffer = ByteBuffer.allocate(initialBufferSize); + this.maxDataSize = maxDataSize; + } + + /** + * Checks if there are more elements in the stream. This operation is + * idempotent. + * + * @return <i>true</i> if there are more elements in the stream or + * <i>false</i> when it reaches the end of the stream after the last + * element was retrieved via {@link #next()} operation. + */ + public boolean hasNext() { + int j = 0; + int readVal = 0; + while (this.data == null && readVal != EOF) { + this.expandBufferIfNecessary(); + try { + readVal = this.is.read(); + } catch (IOException e) { + throw new IllegalStateException("Failed while reading InputStream", e); + } + if (readVal == EOF) { + this.extractDataToken(0); + } else { + byte byteVal = (byte)readVal; + this.buffer.put(byteVal); + if (this.buffer.position() > this.maxDataSize) { + throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded."); + } + if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) { + if (++j == this.delimiterBytes.length) { + this.extractDataToken(this.delimiterBytes.length); + j = 0; + } + } else { + j = 0; + } + } + } + return this.data != null; + } + + /** + * @return byte array representing the next segment in the stream or the + * whole stream if no delimiter is used + */ + public byte[] next() { + try { + return this.data; + } finally { + this.data = null; + } + } + + /** + * + */ + private void expandBufferIfNecessary() { + if (this.buffer.position() == Integer.MAX_VALUE ){ + throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further"); + } + if (this.buffer.remaining() == 0) { + this.buffer.flip(); + int pos = this.buffer.capacity(); + int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2; + ByteBuffer bb = ByteBuffer.allocate(newSize); + bb.put(this.buffer); + this.buffer = bb; + this.buffer.position(pos); + } + } + + /** + * + */ + private void extractDataToken(int lengthSubtract) { + this.buffer.flip(); + if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n) + this.data = new byte[this.buffer.limit() - lengthSubtract]; + this.buffer.get(this.data); + } + this.buffer.clear(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java new file mode 100644 index 0000000..2dc8f0b --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import org.junit.Assert; +import org.junit.Test; + +public class StreamScannerTests { + + @Test + public void validateWithMultiByteCharsNoDelimiter() { + String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000); + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateWithComplexDelimiter() { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test(expected = IllegalStateException.class) + public void validateMaxBufferSize() { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20); + assertTrue(scanner.hasNext()); + } + + @Test + public void verifyScannerHandlesNegativeOneByteInputs() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + } + + @Test + public void verifyScannerHandlesNegativeOneByteDelimiter() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); + StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0}); + assertTrue(scanner.hasNext()); + Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0}); + } + + @Test + public void validateHasNextIdempotencyWithDelimiter() { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); + for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries + assertTrue(scanner.hasNext()); + } + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertTrue(scanner.hasNext()); + assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateHasNextIdempotencyWithoutDelimiter() { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000); + for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries + assertTrue(scanner.hasNext()); + } + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + } + + @Test + public void validateInternalBufferCanExpend() throws Exception { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamScanner scanner = new StreamScanner(is, null, 1000, 2); + Field bufferField = StreamScanner.class.getDeclaredField("buffer"); + bufferField.setAccessible(true); + ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner); + assertEquals(2, buffer.capacity()); + + assertTrue(scanner.hasNext()); + assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); + assertFalse(scanner.hasNext()); + + buffer = (ByteBuffer) bufferField.get(scanner); + assertEquals(128, buffer.capacity()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index ebdf5c8..bcf10a4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.stream.io.util.StreamScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +64,7 @@ class KafkaPublisher implements AutoCloseable { KafkaPublisher(Properties kafkaProperties) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.producer = new KafkaProducer<byte[], byte[]>(kafkaProperties); + this.producer = new KafkaProducer<>(kafkaProperties); this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2; try { if (kafkaProperties.containsKey("partitioner.class")){ @@ -132,7 +133,7 @@ class KafkaPublisher implements AutoCloseable { partitionKey = this.getPartition(key, topicName); } if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) { - ProducerRecord<byte[], byte[]> message = new ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content); + ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(topicName, partitionKey, key, content); sendFutures.add(this.toKafka(message)); } segmentCounter++; http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java deleted file mode 100644 index 57bbbcf..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * - */ -class StreamScanner { - - private final static int EOF = -1; - - private final InputStream is; - - private final byte[] delimiterBytes; - - private final int maxDataSize; - - private ByteBuffer buffer; - - private byte[] data; - - /** - * Constructs a new instance - * - * @param is - * instance of {@link InputStream} representing the data - * @param delimiterBytes - * byte array representing delimiter bytes used to split the - * input stream. Can be null - * @param maxDataSize - * maximum size of data derived from the input stream. This means - * that neither {@link InputStream} nor its individual chunks (if - * delimiter is used) can ever be greater then this size. - */ - StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { - this(is, delimiterBytes, maxDataSize, 8192); - } - - /** - * Constructs a new instance - * - * @param is - * instance of {@link InputStream} representing the data - * @param delimiterBytes - * byte array representing delimiter bytes used to split the - * input stream. Can be null - * @param maxDataSize - * maximum size of data derived from the input stream. This means - * that neither {@link InputStream} nor its individual chunks (if - * delimiter is used) can ever be greater then this size. - * @param initialBufferSize - * initial size of the buffer used to buffer {@link InputStream} - * or its parts (if delimiter is used) to create its byte[] - * representation. Must be positive integer. The buffer will grow - * automatically as needed up to the Integer.MAX_VALUE; - * - */ - StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { - this.is = new BufferedInputStream(is); - this.delimiterBytes = delimiterBytes; - this.buffer = ByteBuffer.allocate(initialBufferSize); - this.maxDataSize = maxDataSize; - } - - /** - * Checks if there are more elements in the stream. This operation is - * idempotent. - * - * @return <i>true</i> if there are more elements in the stream or - * <i>false</i> when it reaches the end of the stream after the last - * element was retrieved via {@link #next()} operation. - */ - boolean hasNext() { - int j = 0; - int readVal = 0; - while (this.data == null && readVal != EOF) { - this.expandBufferIfNecessary(); - try { - readVal = this.is.read(); - } catch (IOException e) { - throw new IllegalStateException("Failed while reading InputStream", e); - } - if (readVal == EOF) { - this.extractDataToken(0); - } else { - byte byteVal = (byte)readVal; - this.buffer.put(byteVal); - if (this.buffer.position() > this.maxDataSize) { - throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded."); - } - if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) { - if (++j == this.delimiterBytes.length) { - this.extractDataToken(this.delimiterBytes.length); - j = 0; - } - } else { - j = 0; - } - } - } - return this.data != null; - } - - /** - * @return byte array representing the next segment in the stream or the - * whole stream if no delimiter is used - */ - byte[] next() { - try { - return this.data; - } finally { - this.data = null; - } - } - - /** - * - */ - private void expandBufferIfNecessary() { - if (this.buffer.position() == Integer.MAX_VALUE ){ - throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further"); - } - if (this.buffer.remaining() == 0) { - this.buffer.flip(); - int pos = this.buffer.capacity(); - int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2; - ByteBuffer bb = ByteBuffer.allocate(newSize); - bb.put(this.buffer); - this.buffer = bb; - this.buffer.position(pos); - } - } - - /** - * - */ - private void extractDataToken(int lengthSubtract) { - this.buffer.flip(); - if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n) - this.data = new byte[this.buffer.limit() - lengthSubtract]; - this.buffer.get(this.data); - } - this.buffer.clear(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/9235a28f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java deleted file mode 100644 index 1ebc4c4..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/StreamScannerTests.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import org.junit.Assert; -import org.junit.Test; - -public class StreamScannerTests { - - @Test - public void validateWithMultiByteCharsNoDelimiter() { - String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000); - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateWithComplexDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test(expected = IllegalStateException.class) - public void validateMaxBufferSize() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20); - assertTrue(scanner.hasNext()); - } - - @Test - public void verifyScannerHandlesNegativeOneByteInputs() { - ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - } - - @Test - public void verifyScannerHandlesNegativeOneByteDelimiter() { - ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0}); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0}); - } - - @Test - public void validateHasNextIdempotencyWithDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); - for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries - assertTrue(scanner.hasNext()); - } - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateHasNextIdempotencyWithoutDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000); - for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries - assertTrue(scanner.hasNext()); - } - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateInternalBufferCanExpend() throws Exception { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000, 2); - Field bufferField = StreamScanner.class.getDeclaredField("buffer"); - bufferField.setAccessible(true); - ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner); - assertEquals(2, buffer.capacity()); - - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - - buffer = (ByteBuffer) bufferField.get(scanner); - assertEquals(128, buffer.capacity()); - } -}
