Repository: nifi Updated Branches: refs/heads/support/nifi-0.6.x 58417c006 -> 15bab7dda
NIFI-1701 fixed merge conflicts Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/15bab7dd Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/15bab7dd Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/15bab7dd Branch: refs/heads/support/nifi-0.6.x Commit: 15bab7dda858f6f20c73a7b729962bb7c1f130b9 Parents: 58417c0 Author: Oleg Zhurakousky <[email protected]> Authored: Mon Apr 4 13:19:24 2016 -0400 Committer: Oleg Zhurakousky <[email protected]> Committed: Mon Apr 4 13:19:24 2016 -0400 ---------------------------------------------------------------------- .../apache/nifi/processors/kafka/PutKafka.java | 5 ----- .../kafka/SplittableMessageContext.java | 23 ++++++++++---------- 2 files changed, 12 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/15bab7dd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 444b251..a9fe10c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -410,13 +410,8 @@ public class PutKafka extends AbstractProcessor { String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS); if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) { topicName = flowFile.getAttribute(ATTR_TOPIC); -<<<<<<< HEAD - key = flowFile.getAttribute(ATTR_KEY).getBytes(); - delimiterPattern = flowFile.getAttribute(ATTR_DELIMITER); -======= key = flowFile.getAttribute(ATTR_KEY) == null ? null : flowFile.getAttribute(ATTR_KEY).getBytes(); delimiterBytes = flowFile.getAttribute(ATTR_DELIMITER) != null ? flowFile.getAttribute(ATTR_DELIMITER).getBytes(StandardCharsets.UTF_8) : null; ->>>>>>> 25290ce... NIFI-1701 fixed StreamScanner, added more tests } else { failedSegmentsString = null; topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); http://git-wip-us.apache.org/repos/asf/nifi/blob/15bab7dd/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java index dcbec8e..d5f1c0b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java @@ -45,14 +45,10 @@ final class SplittableMessageContext { * byte array representing bytes by which the data will be * delimited. Can be null. */ -<<<<<<< HEAD - SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) { -======= SplittableMessageContext(String topicName, byte[] keyBytes, byte[] delimiterBytes) { if (topicName == null || topicName.trim().length() == 0){ throw new IllegalArgumentException("'topicName' must not be null or empty"); } ->>>>>>> 25290ce... NIFI-1701 fixed StreamScanner, added more tests this.topicName = topicName; this.keyBytes = keyBytes; this.delimiterBytes = delimiterBytes != null ? delimiterBytes : null; @@ -68,20 +64,25 @@ final class SplittableMessageContext { } /** - * + * Will set failed segments from an array of integers */ void setFailedSegments(int... failedSegments) { - this.failedSegments = new BitSet(); - for (int failedSegment : failedSegments) { - this.failedSegments.set(failedSegment); + if (failedSegments != null) { + this.failedSegments = new BitSet(); + for (int failedSegment : failedSegments) { + this.failedSegments.set(failedSegment); + } } } /** - * + * Will set failed segments from an array of bytes that will be used to + * construct the final {@link BitSet} representing failed segments */ void setFailedSegmentsAsByteArray(byte[] failedSegments) { - this.failedSegments = BitSet.valueOf(failedSegments); + if (failedSegments != null) { + this.failedSegments = BitSet.valueOf(failedSegments); + } } /** @@ -110,7 +111,7 @@ final class SplittableMessageContext { * Returns the key bytes as String */ String getKeyBytesAsString() { - return new String(this.keyBytes); + return this.keyBytes != null ? new String(this.keyBytes) : null; } /**
