NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIFI-1827, NIFI-1699 implemented new Kafka processors that leverage Kafka 0.9 API - Improved StreamScanner for better performance - Renamed StreamScanner to StreamDemarcator as suggested by Joe - Added failure handling logic to ensure both processors can be reset to their initial state (as if they were just started) - Provided comprehensive test suite to validate various aspects of both Publish and Consume from Kafka - Added relevant javadocs - Added initial additionalDetails docs - Addressed NPE reported by NIFI-1764 - Life-cycle refactoring for the existing PutKafka to ensure producer restart after errors - Incorporated code changes contributed by Ralph Perko (see NIFI-1837) - Addressed partition issue in RoundRobinPartitioner discussed in NIFI-1827 - Updated PropertyDescriptor descriptions to reflect their purpose
NIFI-1296 added @Ignore on some Kafka tests to improve test time NIFI-1296 reworked tests to avoid dependency on embedded Kafka NIFI-1296 fixed spelling error NIFI-1296 fixed trailing whitespaces in non-java files This closes #366 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2d03489e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2d03489e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2d03489e Branch: refs/heads/master Commit: 2d03489ec5dbc56bcb0dbd7b694cdf030d2532a5 Parents: c13fb31 Author: Oleg Zhurakousky <[email protected]> Authored: Thu Apr 7 07:15:25 2016 -0400 Committer: Oleg Zhurakousky <[email protected]> Committed: Thu May 19 07:33:03 2016 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + .../nifi/stream/io/util/StreamDemarcator.java | 191 ++++++++ .../nifi/stream/io/util/StreamScanner.java | 164 ------- .../stream/io/util/StreamDemarcatorTest.java | 227 +++++++++ .../nifi/stream/io/util/StreamScannerTests.java | 130 ------ .../kafka/AbstractKafkaProcessor.java | 145 ++++++ .../nifi/processors/kafka/KafkaPublisher.java | 274 ++++++----- .../nifi/processors/kafka/Partitioners.java | 9 +- .../processors/kafka/PublishingContext.java | 151 ++++++ .../apache/nifi/processors/kafka/PutKafka.java | 257 ++++++----- .../kafka/SplittableMessageContext.java | 123 ----- .../kafka/GetKafkaIntegrationTests.java | 4 + .../processors/kafka/KafkaPublisherTest.java | 126 +++-- .../nifi/processors/kafka/PutKafkaTest.java | 228 ++++++++++ .../kafka/SplittableMessageContextTest.java | 66 --- .../nifi/processors/kafka/TestPutKafka.java | 270 ----------- .../nifi-kafka-pubsub-nar/pom.xml | 35 ++ .../src/main/resources/META-INF/LICENSE | 299 ++++++++++++ .../src/main/resources/META-INF/NOTICE | 72 +++ .../nifi-kafka-pubsub-processors/pom.xml | 79 ++++ .../kafka/pubsub/AbstractKafkaProcessor.java | 334 ++++++++++++++ .../processors/kafka/pubsub/ConsumeKafka.java | 242 ++++++++++ .../processors/kafka/pubsub/KafkaPublisher.java | 232 ++++++++++ .../processors/kafka/pubsub/Partitioners.java | 61 +++ .../processors/kafka/pubsub/PublishKafka.java | 359 +++++++++++++++ .../kafka/pubsub/PublishingContext.java | 139 ++++++ .../org.apache.nifi.processor.Processor | 16 + .../additionalDetails.html | 33 ++ .../additionalDetails.html | 47 ++ .../AbstractKafkaProcessorLifecycelTest.java | 456 +++++++++++++++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 167 +++++++ .../kafka/pubsub/KafkaPublisherTest.java | 302 ++++++++++++ .../kafka/pubsub/PublishKafkaTest.java | 254 +++++++++++ .../kafka/pubsub/PublishingContextTest.java | 106 +++++ .../kafka/pubsub/StubConsumeKafka.java | 71 +++ .../kafka/pubsub/StubPublishKafka.java | 95 ++++ .../nifi/processors/kafka/pubsub/TestUtils.java | 46 ++ .../processors/kafka/test/EmbeddedKafka.java | 226 +++++++++ .../kafka/test/EmbeddedKafkaProducerHelper.java | 110 +++++ .../src/test/resources/log4j.properties | 21 + .../src/test/resources/server.properties | 121 +++++ .../src/test/resources/zookeeper.properties | 20 + nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 7 + pom.xml | 6 + 44 files changed, 5306 insertions(+), 1020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 529b588..d07dfb1 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -159,6 +159,11 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kafka-pubsub-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-http-context-map-nar</artifactId> <type>nar</type> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java new file mode 100644 index 0000000..3064f1c --- /dev/null +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import java.io.IOException; +import java.io.InputStream; + +/** + * The <code>StreamDemarcator</code> class takes an input stream and demarcates + * it so it could be read (see {@link #nextToken()}) as individual byte[] + * demarcated by the provided delimiter. If delimiter is not provided the entire + * stream will be read into a single token which may result in + * {@link OutOfMemoryError} if stream is too large. + */ +public class StreamDemarcator { + + private final static int INIT_BUFFER_SIZE = 8192; + + private final InputStream is; + + private final byte[] delimiterBytes; + + private final int maxDataSize; + + private final int initialBufferSize; + + + private byte[] buffer; + + private int index; + + private int mark; + + private int readAheadLength; + + /** + * Constructs a new instance + * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. + */ + public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) { + this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE); + } + + /** + * Constructs a new instance + * + * @param is + * instance of {@link InputStream} representing the data + * @param delimiterBytes + * byte array representing delimiter bytes used to split the + * input stream. Can be null + * @param maxDataSize + * maximum size of data derived from the input stream. This means + * that neither {@link InputStream} nor its individual chunks (if + * delimiter is used) can ever be greater then this size. + * @param initialBufferSize + * initial size of the buffer used to buffer {@link InputStream} + * or its parts (if delimiter is used) to create its byte[] + * representation. Must be positive integer. The buffer will grow + * automatically as needed up to the Integer.MAX_VALUE; + * + */ + public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { + this.validateInput(is, delimiterBytes, maxDataSize, initialBufferSize); + this.is = is; + this.delimiterBytes = delimiterBytes; + this.initialBufferSize = initialBufferSize; + this.buffer = new byte[initialBufferSize]; + this.maxDataSize = maxDataSize; + } + + /** + * Will read the next data token from the {@link InputStream} returning null + * when it reaches the end of the stream. + */ + public byte[] nextToken() { + byte[] data = null; + int j = 0; + + while (data == null && this.buffer != null) { + if (this.index >= this.readAheadLength) { + this.fill(); + } + if (this.index >= this.readAheadLength) { + data = this.extractDataToken(0); + this.buffer = null; + } else { + byte byteVal = this.buffer[this.index++]; + if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) { + if (++j == this.delimiterBytes.length) { + data = this.extractDataToken(this.delimiterBytes.length); + this.mark = this.index; + j = 0; + } + } else { + j = 0; + } + } + } + return data; + } + + /** + * Will fill the current buffer from current 'index' position, expanding it + * and or shuffling it if necessary + */ + private void fill() { + if (this.index >= this.buffer.length) { + if (this.mark == 0) { // expand + byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize]; + System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length); + this.buffer = newBuff; + } else { // shuffle + int length = this.index - this.mark; + System.arraycopy(this.buffer, this.mark, this.buffer, 0, length); + this.index = length; + this.mark = 0; + } + } + + try { + int bytesRead; + do { + bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index); + } while (bytesRead == 0); + + if (bytesRead != -1) { + this.readAheadLength = this.index + bytesRead; + if (this.readAheadLength > this.maxDataSize) { + throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded."); + } + } + } catch (IOException e) { + throw new IllegalStateException("Failed while reading InputStream", e); + } + } + + /** + * Will extract data token from the current buffer. The length of the data + * token is between the current 'mark' and 'index' minus 'lengthSubtract' + * which signifies the length of the delimiter (if any). If the above + * subtraction results in length 0, null is returned. + */ + private byte[] extractDataToken(int lengthSubtract) { + byte[] data = null; + int length = this.index - this.mark - lengthSubtract; + if (length > 0) { + data = new byte[length]; + System.arraycopy(this.buffer, this.mark, data, 0, data.length); + } + return data; + } + + /** + * + */ + private void validateInput(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { + if (is == null) { + throw new IllegalArgumentException("'is' must not be null"); + } else if (maxDataSize <= 0) { + throw new IllegalArgumentException("'maxDataSize' must be > 0"); + } else if (initialBufferSize <= 0) { + throw new IllegalArgumentException("'initialBufferSize' must be > 0"); + } else if (delimiterBytes != null && delimiterBytes.length == 0){ + throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0"); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java deleted file mode 100644 index 901f31a..0000000 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io.util; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * - */ -public class StreamScanner { - - private final static int EOF = -1; - - private final InputStream is; - - private final byte[] delimiterBytes; - - private final int maxDataSize; - - private ByteBuffer buffer; - - private byte[] data; - - /** - * Constructs a new instance - * - * @param is - * instance of {@link InputStream} representing the data - * @param delimiterBytes - * byte array representing delimiter bytes used to split the - * input stream. Can be null - * @param maxDataSize - * maximum size of data derived from the input stream. This means - * that neither {@link InputStream} nor its individual chunks (if - * delimiter is used) can ever be greater then this size. - */ - public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize) { - this(is, delimiterBytes, maxDataSize, 8192); - } - - /** - * Constructs a new instance - * - * @param is - * instance of {@link InputStream} representing the data - * @param delimiterBytes - * byte array representing delimiter bytes used to split the - * input stream. Can be null - * @param maxDataSize - * maximum size of data derived from the input stream. This means - * that neither {@link InputStream} nor its individual chunks (if - * delimiter is used) can ever be greater then this size. - * @param initialBufferSize - * initial size of the buffer used to buffer {@link InputStream} - * or its parts (if delimiter is used) to create its byte[] - * representation. Must be positive integer. The buffer will grow - * automatically as needed up to the Integer.MAX_VALUE; - * - */ - public StreamScanner(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) { - this.is = new BufferedInputStream(is); - this.delimiterBytes = delimiterBytes; - this.buffer = ByteBuffer.allocate(initialBufferSize); - this.maxDataSize = maxDataSize; - } - - /** - * Checks if there are more elements in the stream. This operation is - * idempotent. - * - * @return <i>true</i> if there are more elements in the stream or - * <i>false</i> when it reaches the end of the stream after the last - * element was retrieved via {@link #next()} operation. - */ - public boolean hasNext() { - int j = 0; - int readVal = 0; - while (this.data == null && readVal != EOF) { - this.expandBufferIfNecessary(); - try { - readVal = this.is.read(); - } catch (IOException e) { - throw new IllegalStateException("Failed while reading InputStream", e); - } - if (readVal == EOF) { - this.extractDataToken(0); - } else { - byte byteVal = (byte)readVal; - this.buffer.put(byteVal); - if (this.buffer.position() > this.maxDataSize) { - throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded."); - } - if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) { - if (++j == this.delimiterBytes.length) { - this.extractDataToken(this.delimiterBytes.length); - j = 0; - } - } else { - j = 0; - } - } - } - return this.data != null; - } - - /** - * @return byte array representing the next segment in the stream or the - * whole stream if no delimiter is used - */ - public byte[] next() { - try { - return this.data; - } finally { - this.data = null; - } - } - - /** - * - */ - private void expandBufferIfNecessary() { - if (this.buffer.position() == Integer.MAX_VALUE ){ - throw new IllegalStateException("Internal buffer has reached the capacity and can not be expended any further"); - } - if (this.buffer.remaining() == 0) { - this.buffer.flip(); - int pos = this.buffer.capacity(); - int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? Integer.MAX_VALUE : this.buffer.capacity() * 2; - ByteBuffer bb = ByteBuffer.allocate(newSize); - bb.put(this.buffer); - this.buffer = bb; - this.buffer.position(pos); - } - } - - /** - * - */ - private void extractDataToken(int lengthSubtract) { - this.buffer.flip(); - if (this.buffer.limit() > 0){ // something must be in the buffer; at least delimiter (e.g., \n) - this.data = new byte[this.buffer.limit() - lengthSubtract]; - this.buffer.get(this.data); - } - this.buffer.clear(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java new file mode 100644 index 0000000..996d5ab --- /dev/null +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io.util; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import org.junit.Test; + +public class StreamDemarcatorTest { + + @Test + public void validateInitializationFailure() { + try { + new StreamDemarcator(null, null, -1); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + new StreamDemarcator(mock(InputStream.class), null, -1); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + new StreamDemarcator(mock(InputStream.class), null, 10, -1); + fail(); + } catch (IllegalArgumentException e) { + // success + } + + try { + new StreamDemarcator(mock(InputStream.class), new byte[0], 10, 1); + fail(); + } catch (IllegalArgumentException e) { + // success + } + } + + @Test + public void validateNoDelimiter() { + String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, null, 1000); + assertTrue(Arrays.equals(data.getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + // validate that subsequent invocations of nextToken() do not result in exception + assertNull(scanner.nextToken()); + assertNull(scanner.nextToken()); + } + + @Test + public void validateNoDelimiterSmallInitialBuffer() { + String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1); + assertTrue(Arrays.equals(data.getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + } + + @Test + public void validateSingleByteDelimiter() { + String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(Arrays.equals("Learn from yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" live for today".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" hope for tomorrow. The important thing is not to stop questioning.".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateDelimiterAtTheBeginning() { + String data = ",Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(Arrays.equals("Learn from yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" live for today".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" hope for tomorrow. The important thing is not to stop questioning.".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateEmptyDelimiterSegments() { + String data = ",,,,,Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(Arrays.equals("Learn from yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" live for today".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" hope for tomorrow. The important thing is not to stop questioning.".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateSingleByteDelimiterSmallInitialBuffer() { + String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000, 2); + assertTrue(Arrays.equals("Learn from yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" live for today".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals(" hope for tomorrow. The important thing is not to stop questioning.".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithMultiByteDelimiter() { + String data = "foodaabardaabazzz"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("bar".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("bazzz".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithMultiByteDelimiterAtTheBeginning() { + String data = "daafoodaabardaabazzz"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000); + assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("bar".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("bazzz".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithMultiByteDelimiterSmallInitialBuffer() { + String data = "foodaabarffdaabazz"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000, 1); + assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("barff".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertTrue(Arrays.equals("bazz".getBytes(StandardCharsets.UTF_8), scanner.nextToken())); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithMultiByteCharsNoDelimiter() { + String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, null, 1000); + byte[] next = scanner.nextToken(); + assertNotNull(next); + assertEquals(data, new String(next, StandardCharsets.UTF_8)); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() { + String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8)); + StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2); + byte[] next = scanner.nextToken(); + assertNotNull(next); + assertEquals(data, new String(next, StandardCharsets.UTF_8)); + assertNull(scanner.nextToken()); + } + + @Test + public void validateWithComplexDelimiter() { + String data = "THIS IS MY TEXT<MYDEIMITER>THIS IS MY NEW TEXT<MYDEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamDemarcator scanner = new StreamDemarcator(is, "<MYDEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); + assertEquals("THIS IS MY TEXT", new String(scanner.nextToken(), StandardCharsets.UTF_8)); + assertEquals("THIS IS MY NEW TEXT", new String(scanner.nextToken(), StandardCharsets.UTF_8)); + assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.nextToken(), StandardCharsets.UTF_8)); + assertNull(scanner.nextToken()); + } + + @Test(expected = IllegalStateException.class) + public void validateMaxBufferSize() { + String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; + ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); + StreamDemarcator scanner = new StreamDemarcator(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20); + scanner.nextToken(); + } + + @Test + public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); + StreamDemarcator scanner = new StreamDemarcator(is, null, 20); + byte[] b = scanner.nextToken(); + assertArrayEquals(b, new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); + } + + @Test + public void validateScannerHandlesNegativeOneByteInputs() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); + StreamDemarcator scanner = new StreamDemarcator(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024); + byte[] b = scanner.nextToken(); + assertArrayEquals(b, new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); + } + + @Test + public void verifyScannerHandlesNegativeOneByteDelimiter() { + ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); + StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024); + assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 }); + assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java deleted file mode 100644 index 2dc8f0b..0000000 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayInputStream; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import org.junit.Assert; -import org.junit.Test; - -public class StreamScannerTests { - - @Test - public void validateWithMultiByteCharsNoDelimiter() { - String data = "å THIS IS MY NEW TEXT.å IT HAS A NEWLINE."; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000); - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateWithComplexDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test(expected = IllegalStateException.class) - public void validateMaxBufferSize() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 20); - assertTrue(scanner.hasNext()); - } - - @Test - public void verifyScannerHandlesNegativeOneByteInputs() { - ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - StreamScanner scanner = new StreamScanner(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - } - - @Test - public void verifyScannerHandlesNegativeOneByteDelimiter() { - ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 0, -1, 0, 0, 0}); - StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 1024); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0}); - assertTrue(scanner.hasNext()); - Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0}); - } - - @Test - public void validateHasNextIdempotencyWithDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, "<MY DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000); - for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries - assertTrue(scanner.hasNext()); - } - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertTrue(scanner.hasNext()); - assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateHasNextIdempotencyWithoutDelimiter() { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000); - for (int i = 0; i < 5; i++) { // we only have 3 segments so unless idempotent hasNext would return false after 3 tries - assertTrue(scanner.hasNext()); - } - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - } - - @Test - public void validateInternalBufferCanExpend() throws Exception { - String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY DEIMITER>THIS IS MY NEWEST TEXT"; - ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes()); - StreamScanner scanner = new StreamScanner(is, null, 1000, 2); - Field bufferField = StreamScanner.class.getDeclaredField("buffer"); - bufferField.setAccessible(true); - ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner); - assertEquals(2, buffer.capacity()); - - assertTrue(scanner.hasNext()); - assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8)); - assertFalse(scanner.hasNext()); - - buffer = (ByteBuffer) bufferField.get(scanner); - assertEquals(128, buffer.capacity()); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java new file mode 100644 index 0000000..5a470b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.Closeable; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * Base class for {@link Processor}s to publish and consume messages from Kafka + * + * @see PutKafka + */ +abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor { + + + private volatile boolean acceptTask = true; + + private final AtomicInteger taskCounter = new AtomicInteger(); + + + /** + * @see KafkaPublisher + */ + volatile T kafkaResource; + + /** + * + */ + @Override + public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaResource' can be reset before new tasks are accepted. + this.taskCounter.incrementAndGet(); + final ProcessSession session = sessionFactory.createSession(); + try { + /* + * We can't be doing double null check here since as a pattern + * it only works for lazy init but not reset, which is what we + * are doing here. In fact the first null check is dangerous + * since 'kafkaResource' can become null right after its null + * check passed causing subsequent NPE. + */ + synchronized (this) { + if (this.kafkaResource == null) { + this.kafkaResource = this.buildKafkaResource(context, session); + } + } + + /* + * The 'processed' boolean flag does not imply any failure or success. It simply states that: + * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated + * - PublishKafka - some messages were sent to Kafka based on existence of the input FlowFile + */ + boolean processed = this.rendezvousWithKafka(context, session); + session.commit(); + if (processed) { + this.postCommit(context); + } else { + context.yield(); + } + } catch (Throwable e) { + this.acceptTask = false; + session.rollback(true); + this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[] { this, e }); + } finally { + synchronized (this) { + if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { + this.close(); + this.acceptTask = true; + } + } + } + } else { + context.yield(); + } + } + + /** + * Will call {@link Closeable#close()} on the target resource after which + * the target resource will be set to null. Should only be called when there + * are no more threads being executed on this processor or when it has been + * verified that only a single thread remains. + * + * @see KafkaPublisher + */ + @OnStopped + public void close() { + if (this.taskCounter.get() == 0) { + try { + if (this.kafkaResource != null) { + try { + this.kafkaResource.close(); + } catch (Exception e) { + this.getLogger().warn("Failed while closing " + this.kafkaResource, e); + } + } + } finally { + this.kafkaResource = null; + } + } + } + + /** + * This operation will be executed after {@link ProcessSession#commit()} has + * been called. + */ + protected void postCommit(ProcessContext context) { + + } + + /** + * This operation is called from + * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and + * contains main processing logic for this Processor. + */ + protected abstract boolean rendezvousWithKafka(ProcessContext context, ProcessSession session); + + /** + * Builds target resource for interacting with Kafka. The target resource + * could be one of {@link KafkaPublisher} or {@link KafkaConsumer} + */ + protected abstract T buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index afb2cc6..dac5804 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -16,9 +16,9 @@ */ package org.apache.nifi.processors.kafka; +import java.io.Closeable; import java.io.InputStream; import java.util.ArrayList; -import java.util.BitSet; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -27,47 +27,50 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.stream.io.util.StreamScanner; +import org.apache.nifi.stream.io.util.StreamDemarcator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import kafka.producer.KeyedMessage; import kafka.producer.Partitioner; /** - * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with - * sending content of {@link FlowFile}s to Kafka. + * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor + * with sending contents of the {@link FlowFile}s to Kafka. */ -class KafkaPublisher implements AutoCloseable { +class KafkaPublisher implements Closeable { private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); - private final KafkaProducer<byte[], byte[]> producer; + private final Producer<byte[], byte[]> kafkaProducer; - private final Partitioner partitioner; - - private final long ackWaitTime; + private long ackWaitTime = 30000; private ProcessorLog processLog; + private final Partitioner partitioner; + /** * Creates an instance of this class as well as the instance of the * corresponding Kafka {@link KafkaProducer} using provided Kafka * configuration properties. + * + * @param kafkaProperties + * instance of {@link Properties} used to bootstrap + * {@link KafkaProducer} */ KafkaPublisher(Properties kafkaProperties) { kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.producer = new KafkaProducer<>(kafkaProperties); - this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2; + this.kafkaProducer = new KafkaProducer<>(kafkaProperties); try { - if (kafkaProperties.containsKey("partitioner.class")){ + if (kafkaProperties.containsKey("partitioner.class")) { this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance(); } else { this.partitioner = null; @@ -78,152 +81,181 @@ class KafkaPublisher implements AutoCloseable { } /** - * - */ - void setProcessLog(ProcessorLog processLog) { - this.processLog = processLog; - } - - /** - * Publishes messages to Kafka topic. It supports three publishing - * mechanisms. + * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to + * determine how many messages to Kafka will be sent from a provided + * {@link InputStream} (see {@link PublishingContext#getContentStream()}). + * It supports two publishing modes: * <ul> - * <li>Sending the entire content stream as a single Kafka message.</li> - * <li>Splitting the incoming content stream into chunks and sending - * individual chunks as separate Kafka messages.</li> - * <li>Splitting the incoming content stream into chunks and sending only - * the chunks that have failed previously @see - * {@link SplittableMessageContext#getFailedSegments()}.</li> + * <li>Sending all messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> + * <li>Sending only unacknowledged messages constructed from + * {@link StreamDemarcator#nextToken()} operation.</li> * </ul> + * The unacknowledged messages are determined from the value of + * {@link PublishingContext#getLastAckedMessageIndex()}. + * <br> * This method assumes content stream affinity where it is expected that the * content stream that represents the same Kafka message(s) will remain the * same across possible retries. This is required specifically for cases * where delimiter is used and a single content stream may represent - * multiple Kafka messages. The failed segment list will keep the index of - * of each content stream segment that had failed to be sent to Kafka, so - * upon retry only the failed segments are sent. + * multiple Kafka messages. The + * {@link PublishingContext#getLastAckedMessageIndex()} will provide the + * index of the last ACKed message, so upon retry only messages with the + * higher index are sent. * - * @param messageContext - * instance of {@link SplittableMessageContext} which hold - * context information about the message to be sent - * @param contentStream - * instance of open {@link InputStream} carrying the content of - * the message(s) to be send to Kafka - * @param partitionKey - * the value of the partition key. Only relevant is user wishes - * to provide a custom partition key instead of relying on - * variety of provided {@link Partitioner}(s) - * @param maxBufferSize maximum message size - * @return The set containing the failed segment indexes for messages that - * failed to be sent to Kafka. + * @param publishingContext + * instance of {@link PublishingContext} which hold context + * information about the message(s) to be sent. + * @return The index of the last successful offset. */ - BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, - int maxBufferSize) { - List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize); - return this.publish(sendFutures); + KafkaPublisherResult publish(PublishingContext publishingContext) { + StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), + publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); + + int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); + List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); + + byte[] messageBytes; + int tokenCounter = 0; + for (; (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { + if (prevLastAckedMessageIndex < tokenCounter) { + Integer partitionId = publishingContext.getPartitionId(); + if (partitionId == null && publishingContext.getKeyBytes() != null) { + partitionId = this.getPartition(publishingContext.getKeyBytes(), publishingContext.getTopic()); + } + ProducerRecord<byte[], byte[]> message = + new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getPartitionId(), publishingContext.getKeyBytes(), messageBytes); + resultFutures.add(this.kafkaProducer.send(message)); + } + } + + int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); + return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); } /** - * This method splits (if required) the incoming content stream into - * messages to publish to Kafka topic. See publish method for more - * details + * Sets the time this publisher will wait for the {@link Future#get()} + * operation (the Future returned by + * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing + * out. * - * @param messageContext - * instance of {@link SplittableMessageContext} which hold - * context information about the message to be sent - * @param contentStream - * instance of open {@link InputStream} carrying the content of - * the message(s) to be send to Kafka - * @param partitionKey - * the value of the partition key. Only relevant is user wishes - * to provide a custom partition key instead of relying on - * variety of provided {@link Partitioner}(s) - * @param maxBufferSize maximum message size - * @return The list of messages to publish + * This value will also be used as a timeout when closing the underlying + * {@link KafkaProducer}. See {@link #close()}. */ - List<Future<RecordMetadata>> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, - int maxBufferSize) { - List<Future<RecordMetadata>> sendFutures = new ArrayList<>(); - BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments(); - int segmentCounter = 0; - StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterBytes(), maxBufferSize); - - while (scanner.hasNext()) { - byte[] content = scanner.next(); - if (content.length > 0){ - byte[] key = messageContext.getKeyBytes(); - String topicName = messageContext.getTopicName(); - if (partitionKey == null && key != null) { - partitionKey = this.getPartition(key, topicName); - } - if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) { - ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(topicName, partitionKey, key, content); - sendFutures.add(this.toKafka(message)); - } - segmentCounter++; - } - } - return sendFutures; + void setAckWaitTime(long ackWaitTime) { + this.ackWaitTime = ackWaitTime; } /** + * This operation will process ACKs from Kafka in the order in which + * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning + * the index of the last ACKed message. Within this operation processing ACK + * simply means successful invocation of 'get()' operation on the + * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} + * operation. Upon encountering any type of error while interrogating such + * {@link Future} the ACK loop will end. Messages that were not ACKed would + * be considered non-delivered and therefore could be resent at the later + * time. + * + * @param sendFutures + * list of {@link Future}s representing results of publishing to + * Kafka * + * @param lastAckMessageIndex + * the index of the last ACKed message. It is important to + * provide the last ACKed message especially while re-trying so + * the proper index is maintained. */ - BitSet publish(List<Future<RecordMetadata>> sendFutures) { - int segmentCounter = 0; - BitSet failedSegments = new BitSet(); - for (Future<RecordMetadata> future : sendFutures) { + private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { + boolean exceptionThrown = false; + for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { + Future<RecordMetadata> future = sendFutures.get(segmentCounter); try { future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); + lastAckMessageIndex++; } catch (InterruptedException e) { - failedSegments.set(segmentCounter); + exceptionThrown = true; Thread.currentThread().interrupt(); - logger.warn("Interrupted while waiting for acks from Kafka"); - if (this.processLog != null) { - this.processLog.warn("Interrupted while waiting for acks from Kafka"); - } + this.warnOrError("Interrupted while waiting for acks from Kafka", null); } catch (ExecutionException e) { - failedSegments.set(segmentCounter); - logger.error("Failed while waiting for acks from Kafka", e); - if (this.processLog != null) { - this.processLog.error("Failed while waiting for acks from Kafka", e); - } + exceptionThrown = true; + this.warnOrError("Failed while waiting for acks from Kafka", e); } catch (TimeoutException e) { - failedSegments.set(segmentCounter); - logger.warn("Timed out while waiting for acks from Kafka"); - if (this.processLog != null) { - this.processLog.warn("Timed out while waiting for acks from Kafka"); - } + exceptionThrown = true; + this.warnOrError("Timed out while waiting for acks from Kafka", null); } - segmentCounter++; } - return failedSegments; + + return lastAckMessageIndex; } /** - * + * Will close the underlying {@link KafkaProducer} */ - private int getPartition(Object key, String topicName) { - int partSize = this.producer.partitionsFor(topicName).size(); - return this.partitioner.partition(key, partSize); + @Override + public void close() { + this.kafkaProducer.close(); } /** - * Closes {@link KafkaProducer} + * Will set {@link ProcessorLog} as an additional logger to forward log + * messages to NiFi bulletin */ - @Override - public void close() throws Exception { - this.producer.close(); + void setProcessLog(ProcessorLog processLog) { + this.processLog = processLog; } /** - * Sends the provided {@link KeyedMessage} to Kafka async returning - * {@link Future} + * */ - private Future<RecordMetadata> toKafka(ProducerRecord<byte[], byte[]> message) { - if (logger.isDebugEnabled()) { - logger.debug("Publishing message to '" + message.topic() + "' topic."); + private void warnOrError(String message, Exception e) { + if (e == null) { + logger.warn(message); + if (this.processLog != null) { + this.processLog.warn(message); + } + } else { + logger.error(message, e); + if (this.processLog != null) { + this.processLog.error(message, e); + } + } + } + + static class KafkaPublisherResult { + private final int messagesSent; + private final int lastMessageAcked; + KafkaPublisherResult(int messagesSent, int lastMessageAcked) { + this.messagesSent = messagesSent; + this.lastMessageAcked = lastMessageAcked; + } + + public int getMessagesSent() { + return this.messagesSent; + } + + public int getLastMessageAcked() { + return this.lastMessageAcked; + } + + public boolean isAllAcked() { + return this.messagesSent - 1 == this.lastMessageAcked; + } + + @Override + public String toString() { + return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; + } + } + + /** + * + */ + private int getPartition(Object key, String topicName) { + if (this.partitioner != null) { + int partSize = this.kafkaProducer.partitionsFor(topicName).size(); + return this.partitioner.partition(key, partSize); } - return this.producer.send(message); + return 0; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java index 2a851a4..32d3606 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java @@ -40,12 +40,11 @@ final public class Partitioners { return partitionIndex; } - private int next(int numberOfPartitions) { - if (index == numberOfPartitions) { - index = 0; + private synchronized int next(int numberOfPartitions) { + if (this.index >= numberOfPartitions) { + this.index = 0; } - int indexToReturn = index++; - return indexToReturn; + return index++; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2d03489e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java new file mode 100644 index 0000000..914ac1a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +/** + * Holder of context information used by {@link KafkaPublisher} required to + * publish messages to Kafka. + */ +class PublishingContext { + + private final InputStream contentStream; + + private final String topic; + + private final int lastAckedMessageIndex; + + private volatile Integer partitionId; + + /* + * We're using the default value from Kafka. We are using it to control the + * message size before it goes to to Kafka thus limiting possibility of a + * late failures in Kafka client. + */ + private volatile int maxRequestSize = 1048576; // kafka default + + private volatile boolean maxRequestSizeSet; + + private volatile byte[] keyBytes; + + private volatile byte[] delimiterBytes; + + + + PublishingContext(InputStream contentStream, String topic) { + this(contentStream, topic, -1); + } + + PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { + this.validateInput(contentStream, topic, lastAckedMessageIndex); + this.contentStream = contentStream; + this.topic = topic; + this.lastAckedMessageIndex = lastAckedMessageIndex; + } + + @Override + public String toString() { + return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; + } + + int getLastAckedMessageIndex() { + return this.lastAckedMessageIndex; + } + + int getMaxRequestSize() { + return this.maxRequestSize; + } + + byte[] getKeyBytes() { + return this.keyBytes; + } + + Integer getPartitionId() { + return partitionId; + } + + public void setPartitionId(Integer partitionId) { + this.partitionId = partitionId; + } + + byte[] getDelimiterBytes() { + return this.delimiterBytes; + } + + InputStream getContentStream() { + return this.contentStream; + } + + String getTopic() { + return this.topic; + } + + void setKeyBytes(byte[] keyBytes) { + if (this.keyBytes == null) { + if (keyBytes != null) { + this.assertBytesValid(keyBytes); + this.keyBytes = keyBytes; + } + } else { + throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); + } + } + + void setDelimiterBytes(byte[] delimiterBytes) { + if (this.delimiterBytes == null) { + if (delimiterBytes != null) { + this.assertBytesValid(delimiterBytes); + this.delimiterBytes = delimiterBytes; + } + } else { + throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); + } + } + + void setMaxRequestSize(int maxRequestSize) { + if (!this.maxRequestSizeSet) { + if (maxRequestSize > 0) { + this.maxRequestSize = maxRequestSize; + this.maxRequestSizeSet = true; + } else { + throw new IllegalArgumentException("'maxRequestSize' must be > 0"); + } + } else { + throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); + } + } + + private void assertBytesValid(byte[] bytes) { + if (bytes != null) { + if (bytes.length == 0) { + throw new IllegalArgumentException("'bytes' must not be empty"); + } + } + } + + private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { + if (contentStream == null) { + throw new IllegalArgumentException("'contentStream' must not be null"); + } else if (topic == null || topic.trim().length() == 0) { + throw new IllegalArgumentException("'topic' must not be null or empty"); + } else if (lastAckedMessageIndex < -1) { + throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); + } + } +}
