http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java deleted file mode 100644 index 4677e33..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.PropertyDescriptor.Builder; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Processor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.ssl.SSLContextService; -import org.apache.nifi.util.FormatUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for implementing {@link Processor}s to publish and consume - * messages to/from Kafka - * - * @see PublishKafka - * @see ConsumeKafka - */ -abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor { - - final Logger logger = LoggerFactory.getLogger(this.getClass()); - - private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; - - private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - - - static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT"); - static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL"); - static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT"); - static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL"); - - static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() - .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) - .displayName("Kafka Brokers") - .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(true) - .defaultValue("localhost:9092") - .build(); - static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() - .name(ProducerConfig.CLIENT_ID_CONFIG) - .displayName("Client ID") - .description("String value uniquely identifying this client application. Corresponds to Kafka's 'client.id' property.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder() - .name("security.protocol") - .displayName("Security Protocol") - .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.") - .required(false) - .expressionLanguageSupported(false) - .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL) - .defaultValue(SEC_PLAINTEXT.getValue()) - .build(); - static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder() - .name("sasl.kerberos.service.name") - .displayName("Kerberos Service Name") - .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. " - + "Corresponds to Kafka's 'security.protocol' property." - + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.") - .required(false) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - - static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("topic") - .displayName("Topic Name") - .description("The name of the Kafka Topic") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - - static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("ssl.context.service") - .displayName("SSL Context Service") - .description("Specifies the SSL Context Service to use for communicating with Kafka.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder() - .name("message-demarcator") - .displayName("Message Demarcator") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that are the are successfully sent to or received from Kafka are routed to this relationship") - .build(); - - static final List<PropertyDescriptor> SHARED_DESCRIPTORS = new ArrayList<>(); - - static final Set<Relationship> SHARED_RELATIONSHIPS = new HashSet<>(); - - private final AtomicInteger taskCounter = new AtomicInteger(); - - private volatile boolean acceptTask = true; - - static { - SHARED_DESCRIPTORS.add(BOOTSTRAP_SERVERS); - SHARED_DESCRIPTORS.add(TOPIC); - SHARED_DESCRIPTORS.add(CLIENT_ID); - SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL); - SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE); - SHARED_DESCRIPTORS.add(SSL_CONTEXT_SERVICE); - - SHARED_RELATIONSHIPS.add(REL_SUCCESS); - } - - /** - * Instance of {@link KafkaPublisher} or {@link KafkaConsumer} - */ - volatile T kafkaResource; - - /** - * This thread-safe operation will delegate to - * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first - * checking and creating (if necessary) Kafka resource which could be either - * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and - * destroy the underlying Kafka resource upon catching an {@link Exception} - * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}. - * After Kafka resource is destroyed it will be re-created upon the next - * invocation of this operation essentially providing a self healing mechanism - * to deal with potentially corrupted resource. - * <p> - * Keep in mind that upon catching an exception the state of this processor - * will be set to no longer accept any more tasks, until Kafka resource is reset. - * This means that in a multi-threaded situation currently executing tasks will - * be given a chance to complete while no new tasks will be accepted. - */ - @Override - public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaResource' can be reset before new tasks are accepted. - this.taskCounter.incrementAndGet(); - final ProcessSession session = sessionFactory.createSession(); - try { - /* - * We can't be doing double null check here since as a pattern - * it only works for lazy init but not reset, which is what we - * are doing here. In fact the first null check is dangerous - * since 'kafkaResource' can become null right after its null - * check passed causing subsequent NPE. - */ - synchronized (this) { - if (this.kafkaResource == null) { - this.kafkaResource = this.buildKafkaResource(context, session); - } - } - - /* - * The 'processed' boolean flag does not imply any failure or success. It simply states that: - * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated - * - PublishKafka - some messages were sent to Kafka based on existence of the input FlowFile - */ - boolean processed = this.rendezvousWithKafka(context, session); - session.commit(); - if (processed) { - this.postCommit(context); - } else { - context.yield(); - } - } catch (Throwable e) { - this.acceptTask = false; - session.rollback(true); - this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[] { this, e }); - } finally { - synchronized (this) { - if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) { - this.close(); - this.acceptTask = true; - } - } - } - } else { - this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset."); - context.yield(); - } - } - - /** - * Will call {@link Closeable#close()} on the target resource after which - * the target resource will be set to null. Should only be called when there - * are no more threads being executed on this processor or when it has been - * verified that only a single thread remains. - * - * @see KafkaPublisher - * @see KafkaConsumer - */ - @OnStopped - public void close() { - try { - if (this.kafkaResource != null) { - try { - this.kafkaResource.close(); - } catch (Exception e) { - this.getLogger().warn("Failed while closing " + this.kafkaResource, e); - } - } - } finally { - this.kafkaResource = null; - } - } - - /** - * - */ - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) - .build(); - } - - /** - * This operation is called from - * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and - * contains main processing logic for this Processor. - */ - protected abstract boolean rendezvousWithKafka(ProcessContext context, ProcessSession session); - - /** - * Builds target resource for interacting with Kafka. The target resource - * could be one of {@link KafkaPublisher} or {@link KafkaConsumer} - */ - protected abstract T buildKafkaResource(ProcessContext context, ProcessSession session); - - /** - * This operation will be executed after {@link ProcessSession#commit()} has - * been called. - */ - protected void postCommit(ProcessContext context) { - // no op - } - - /** - * - */ - @Override - protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { - List<ValidationResult> results = new ArrayList<>(); - - String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue(); - - /* - * validates that if one of SASL (Kerberos) option is selected for - * security protocol, then Kerberos principal is provided as well - */ - if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)){ - String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue(); - if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0){ - results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false) - .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <" - + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '" - + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.") - .build()); - } - } - - String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()) - .getValue(); - if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) { - results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) - .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build()); - } - String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()) - .getValue(); - if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) { - results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) - .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build()); - } - String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()) - .getValue(); - if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) { - results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) - .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build()); - } - String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()) - .getValue(); - if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) { - results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) - .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build()); - } - - return results; - } - - /** - * Builds transit URI for provenance event. The transit URI will be in the - * form of <security.protocol>://<bootstrap.servers>/topic - */ - String buildTransitURI(String securityProtocol, String brokers, String topic) { - StringBuilder builder = new StringBuilder(); - builder.append(securityProtocol); - builder.append("://"); - builder.append(brokers); - builder.append("/"); - builder.append(topic); - return builder.toString(); - } - - /** - * Builds Kafka {@link Properties} - */ - Properties buildKafkaProperties(ProcessContext context) { - Properties properties = new Properties(); - for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { - if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) { - // Translate SSLContext Service configuration into Kafka properties - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - buildSSLKafkaProperties(sslContextService, properties); - continue; - } - - String pName = propertyDescriptor.getName(); - String pValue = propertyDescriptor.isExpressionLanguageSupported() - ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() - : context.getProperty(propertyDescriptor).getValue(); - if (pValue != null) { - if (pName.endsWith(".ms")) { // kafka standard time notation - pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS)); - } - properties.setProperty(pName, pValue); - } - } - return properties; - } - - private void buildSSLKafkaProperties(final SSLContextService sslContextService, final Properties properties) { - if (sslContextService == null) { - return; - } - - if (sslContextService.isKeyStoreConfigured()) { - properties.setProperty("ssl.keystore.location", sslContextService.getKeyStoreFile()); - properties.setProperty("ssl.keystore.password", sslContextService.getKeyStorePassword()); - final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword(); - properties.setProperty("ssl.key.password", keyPass); - properties.setProperty("ssl.keystore.type", sslContextService.getKeyStoreType()); - } - - if (sslContextService.isTrustStoreConfigured()) { - properties.setProperty("ssl.truststore.location", sslContextService.getTrustStoreFile()); - properties.setProperty("ssl.truststore.password", sslContextService.getTrustStorePassword()); - properties.setProperty("ssl.truststore.type", sslContextService.getTrustStoreType()); - } - } -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java deleted file mode 100644 index ac5b4c5..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@CapabilityDescription("Consumes messages from Apache Kafka") -@Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) -public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> { - - static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset"); - - static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset"); - - static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); - - static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() - .name(ConsumerConfig.GROUP_ID_CONFIG) - .displayName("Group ID") - .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") - .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() - .name(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) - .displayName("Offset Reset") - .description("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any " - + "more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.") - .required(true) - .allowableValues(OFFSET_EARLIEST, OFFSET_LATEST, OFFSET_NONE) - .defaultValue(OFFSET_LATEST.getValue()) - .build(); - static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER - .description("Since KafkaConsumer receives messages in batches, you have an option to output a single FlowFile which contains " - + "all Kafka messages in a single batch and this property allows you to provide a string (interpreted as UTF-8) to use " - + "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received " - + "in a batch will result in a single FlowFile which essentially means that this processor may output multiple FlowFiles for each " - + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") - .build(); - - - static final List<PropertyDescriptor> DESCRIPTORS; - - static final Set<Relationship> RELATIONSHIPS; - - private volatile byte[] demarcatorBytes; - - private volatile String topic; - - private volatile String brokers; - - /* - * Will ensure that the list of the PropertyDescriptors is build only once, - * since all other lifecycle methods are invoked multiple times. - */ - static { - List<PropertyDescriptor> _descriptors = new ArrayList<>(); - _descriptors.addAll(SHARED_DESCRIPTORS); - _descriptors.add(GROUP_ID); - _descriptors.add(AUTO_OFFSET_RESET); - _descriptors.add(MESSAGE_DEMARCATOR); - DESCRIPTORS = Collections.unmodifiableList(_descriptors); - - RELATIONSHIPS = Collections.unmodifiableSet(SHARED_RELATIONSHIPS); - } - - /** - * - */ - @Override - public Set<Relationship> getRelationships() { - return RELATIONSHIPS; - } - - /** - * Will unsubscribe form {@link KafkaConsumer} delegating to 'super' to do - * the rest. - */ - @Override - @OnStopped - public void close() { - if (this.kafkaResource != null) { - try { - this.kafkaResource.unsubscribe(); - } finally { // in the event the above fails - super.close(); - } - } - } - - /** - * - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - /** - * Will rendezvous with Kafka by performing the following: - * <br> - * - poll {@link ConsumerRecords} from {@link KafkaConsumer} in a - * non-blocking manner, signaling yield if no records were received from - * Kafka - * <br> - * - if records were received form Kafka, the are written to a newly created - * {@link FlowFile}'s {@link OutputStream} using a provided demarcator (see - * {@link #MESSAGE_DEMARCATOR} - */ - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession processSession) { - ConsumerRecords<byte[], byte[]> consumedRecords = this.kafkaResource.poll(100); - if (consumedRecords != null && !consumedRecords.isEmpty()) { - long start = System.nanoTime(); - FlowFile flowFile = processSession.create(); - final AtomicInteger messageCounter = new AtomicInteger(); - final Map<String, String> kafkaAttributes = new HashMap<>(); - - final Iterator<ConsumerRecord<byte[], byte[]>> iter = consumedRecords.iterator(); - while (iter.hasNext()){ - flowFile = processSession.append(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - ConsumerRecord<byte[], byte[]> consumedRecord = iter.next(); - - kafkaAttributes.put("kafka.offset", String.valueOf(consumedRecord.offset())); - if (consumedRecord.key() != null) { - kafkaAttributes.put("kafka.key", new String(consumedRecord.key(), StandardCharsets.UTF_8)); - } - kafkaAttributes.put("kafka.partition", String.valueOf(consumedRecord.partition())); - kafkaAttributes.put("kafka.topic", consumedRecord.topic()); - - if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) { - out.write(ConsumeKafka.this.demarcatorBytes); - } - out.write(consumedRecord.value()); - } - }); - - flowFile = processSession.putAllAttributes(flowFile, kafkaAttributes); - /* - * Release FlowFile if there are more messages in the - * ConsumerRecords batch and no demarcator was provided, - * otherwise the FlowFile will be released as soon as this loop - * exits. - */ - if (iter.hasNext() && ConsumeKafka.this.demarcatorBytes == null){ - this.releaseFlowFile(flowFile, context, processSession, start, messageCounter.get()); - flowFile = processSession.create(); - messageCounter.set(0); - } - } - this.releaseFlowFile(flowFile, context, processSession, start, messageCounter.get()); - } - return consumedRecords != null && !consumedRecords.isEmpty(); - } - - /** - * This operation is called from - * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method after - * the process session is committed so that then kafka offset changes can be - * committed. This can mean in cases of really bad timing we could have data - * duplication upon recovery but not data loss. We want to commit the flow - * files in a NiFi sense before we commit them in a Kafka sense. - */ - @Override - protected void postCommit(ProcessContext context) { - this.kafkaResource.commitSync(); - } - - /** - * Builds and instance of {@link KafkaConsumer} and subscribes to a provided - * topic. - */ - @Override - protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext context, ProcessSession session) { - this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() - ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8) - : null; - this.topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue(); - this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - Properties kafkaProperties = this.buildKafkaProperties(context); - - /* - * Since we are using unconventional way to validate if connectivity to - * broker is possible we need a mechanism to be able to disable it. - * 'check.connection' property will serve as such mechanism - */ - if (!"false".equals(kafkaProperties.get("check.connection"))) { - this.checkIfInitialConnectionPossible(); - } - - kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - - KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties); - consumer.subscribe(Collections.singletonList(this.topic)); - return consumer; - } - - /** - * Checks via brute force if it is possible to establish connection to at - * least one broker. If not this method will throw {@link ProcessException}. - */ - private void checkIfInitialConnectionPossible(){ - String[] br = this.brokers.split(","); - boolean connectionPossible = false; - for (int i = 0; i < br.length && !connectionPossible; i++) { - String hostPortCombo = br[i]; - String[] hostPort = hostPortCombo.split(":"); - Socket client = null; - try { - client = new Socket(); - client.connect(new InetSocketAddress(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim())), 10000); - connectionPossible = true; - } catch (Exception e) { - this.logger.error("Connection to '" + hostPortCombo + "' is not possible", e); - } finally { - try { - client.close(); - } catch (IOException e) { - // ignore - } - } - } - if (!connectionPossible){ - throw new ProcessException("Connection to " + this.brokers + " is not possible. See logs for more details"); - } - } - /** - * Will release flow file. Releasing of the flow file in the context of this - * operation implies the following: - * - * If Empty then remove from session and return If has something then - * transfer to {@link #REL_SUCCESS} - */ - private void releaseFlowFile(FlowFile flowFile, ProcessContext context, ProcessSession session, long start, int msgCount) { - long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - String transitUri = this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), this.brokers, topic); - session.getProvenanceReporter().receive(flowFile, transitUri, "Received " + msgCount + " Kafka messages", executionDuration); - this.getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] { flowFile, msgCount, executionDuration }); - session.transfer(flowFile, REL_SUCCESS); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java deleted file mode 100644 index e2cdea2..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.nifi.logging.ComponentLog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -/** - * A pool of Kafka Consumers for a given topic. Clients must create the ConsumerPool and call initialize() before - * acquiring consumers. Consumers should be returned by calling returnConsumerResource. - */ -public class ConsumerPool implements Closeable { - - private final int size; - private final BlockingQueue<ConsumerResource> consumers; - private final String topic; - private final Properties kafkaProperties; - private final ComponentLog logger; - private boolean initialized = false; - - /** - * Initializes the pool with the given size, topic, properties, and logger, but does not create any consumers until initialize() is called. - * - * @param size the number of consumers to pool - * @param topic the topic to consume from - * @param kafkaProperties the properties for each consumer - * @param logger the logger to report any errors/warnings - */ - public ConsumerPool(final int size, final String topic, final Properties kafkaProperties, final ComponentLog logger) { - this.size = size; - this.logger = logger; - this.topic = topic; - this.kafkaProperties = kafkaProperties; - this.consumers = new LinkedBlockingQueue<>(size); - } - - /** - * Creates the consumers and subscribes them to the given topic. This method must be called before - * acquiring any consumers. - */ - public synchronized void initialize() { - if (initialized) { - return; - } - - for (int i=0; i < size; i++) { - ConsumerResource resource = createConsumerResource(); - consumers.offer(resource); - } - - initialized = true; - } - - /** - * @return a ConsumerResource from the pool, or a newly created ConsumerResource if none were available in the pool - * @throws IllegalStateException if attempting to get a consumer before calling initialize() - */ - public synchronized ConsumerResource getConsumerResource() { - if (!initialized) { - throw new IllegalStateException("ConsumerPool must be initialized before acquiring consumers"); - } - - ConsumerResource consumerResource = consumers.poll(); - if (consumerResource == null) { - consumerResource = createConsumerResource(); - } - return consumerResource; - } - - /** - * If the given ConsumerResource has been poisoned then it is closed and not returned to the pool, - * otherwise it is attempted to be returned to the pool. If the pool is already full then it is closed - * and not returned. - * - * @param consumerResource - */ - public synchronized void returnConsumerResource(final ConsumerResource consumerResource) { - if (consumerResource == null) { - return; - } - - if (consumerResource.isPoisoned()) { - closeConsumer(consumerResource.getConsumer()); - } else { - boolean added = consumers.offer(consumerResource); - if (!added) { - closeConsumer(consumerResource.getConsumer()); - } - } - } - - /** - * Closes all ConsumerResources in the pool and resets the initialization state of this pool. - * - * @throws IOException should never throw - */ - @Override - public synchronized void close() throws IOException { - ConsumerResource consumerResource; - while ((consumerResource = consumers.poll()) != null) { - closeConsumer(consumerResource.getConsumer()); - } - initialized = false; - } - - private ConsumerResource createConsumerResource() { - final Consumer<byte[],byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProperties); - kafkaConsumer.subscribe(Collections.singletonList(this.topic)); - return new ConsumerResource(kafkaConsumer, this, logger); - } - - private void closeConsumer(Consumer consumer) { - try { - consumer.unsubscribe(); - } catch (Exception e) { - logger.warn("Failed while unsubscribing " + consumer, e); - } - - try { - consumer.close(); - } catch (Exception e) { - logger.warn("Failed while closing " + consumer, e); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java deleted file mode 100644 index baaf39f..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerResource.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.nifi.logging.ComponentLog; - -import java.io.Closeable; -import java.io.IOException; - -/** - * A wrapper for a Kafka Consumer obtained from a ConsumerPool. Client's should call poison() to indicate that this - * consumer should no longer be used by other clients, and should always call close(). Calling close() will pass - * this consumer back to the pool and the pool will determine the appropriate handling based on whether the consumer - * has been poisoned and whether the pool is already full. - */ -public class ConsumerResource implements Closeable { - - private final ComponentLog logger; - private final Consumer<byte[],byte[]> consumer; - private final ConsumerPool consumerPool; - private boolean poisoned = false; - - /** - * @param consumer the Kafka Consumer - * @param consumerPool the ConsumerPool this ConsumerResource was obtained from - * @param logger the logger to report any errors/warnings - */ - public ConsumerResource(Consumer<byte[], byte[]> consumer, ConsumerPool consumerPool, ComponentLog logger) { - this.logger = logger; - this.consumer = consumer; - this.consumerPool = consumerPool; - } - - /** - * @return the Kafka Consumer for this - */ - public Consumer<byte[],byte[]> getConsumer() { - return consumer; - } - - /** - * Sets the poison flag for this consumer to true. - */ - public void poison() { - poisoned = true; - } - - /** - * @return true if this consumer has been poisoned, false otherwise - */ - public boolean isPoisoned() { - return poisoned; - } - - @Override - public void close() throws IOException { - consumerPool.returnConsumerResource(this); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java deleted file mode 100644 index f684bfa..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.Closeable; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.stream.io.util.StreamDemarcator; - -/** - * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor - * with sending contents of the {@link FlowFile}s to Kafka. - */ -class KafkaPublisher implements Closeable { - private final Producer<byte[], byte[]> kafkaProducer; - - private volatile long ackWaitTime = 30000; - - private final ComponentLog componentLog; - - private final int ackCheckSize; - - KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) { - this(kafkaProperties, 100, componentLog); - } - - /** - * Creates an instance of this class as well as the instance of the - * corresponding Kafka {@link KafkaProducer} using provided Kafka - * configuration properties. - * - * @param kafkaProperties - * instance of {@link Properties} used to bootstrap - * {@link KafkaProducer} - */ - KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) { - this.kafkaProducer = new KafkaProducer<>(kafkaProperties); - this.ackCheckSize = ackCheckSize; - this.componentLog = componentLog; - } - - /** - * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to - * determine how many messages to Kafka will be sent from a provided - * {@link InputStream} (see {@link PublishingContext#getContentStream()}). - * It supports two publishing modes: - * <ul> - * <li>Sending all messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * <li>Sending only unacknowledged messages constructed from - * {@link StreamDemarcator#nextToken()} operation.</li> - * </ul> - * The unacknowledged messages are determined from the value of - * {@link PublishingContext#getLastAckedMessageIndex()}. - * <br> - * This method assumes content stream affinity where it is expected that the - * content stream that represents the same Kafka message(s) will remain the - * same across possible retries. This is required specifically for cases - * where delimiter is used and a single content stream may represent - * multiple Kafka messages. The - * {@link PublishingContext#getLastAckedMessageIndex()} will provide the - * index of the last ACKed message, so upon retry only messages with the - * higher index are sent. - * - * @param publishingContext - * instance of {@link PublishingContext} which hold context - * information about the message(s) to be sent. - * @return The index of the last successful offset. - */ - KafkaPublisherResult publish(PublishingContext publishingContext) { - StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(), - publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize()); - - int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex(); - List<Future<RecordMetadata>> resultFutures = new ArrayList<>(); - - byte[] messageBytes; - int tokenCounter = 0; - boolean continueSending = true; - KafkaPublisherResult result = null; - for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) { - if (prevLastAckedMessageIndex < tokenCounter) { - ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes); - resultFutures.add(this.kafkaProducer.send(message)); - - if (tokenCounter % this.ackCheckSize == 0){ - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - if (lastAckedMessageIndex % this.ackCheckSize != 0) { - continueSending = false; - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - prevLastAckedMessageIndex = lastAckedMessageIndex; - } - } - } - - if (result == null) { - int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex); - resultFutures.clear(); - result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex); - } - return result; - } - - /** - * Sets the time this publisher will wait for the {@link Future#get()} - * operation (the Future returned by - * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing - * out. - * - * This value will also be used as a timeout when closing the underlying - * {@link KafkaProducer}. See {@link #close()}. - */ - void setAckWaitTime(long ackWaitTime) { - this.ackWaitTime = ackWaitTime; - } - - /** - * This operation will process ACKs from Kafka in the order in which - * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning - * the index of the last ACKed message. Within this operation processing ACK - * simply means successful invocation of 'get()' operation on the - * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)} - * operation. Upon encountering any type of error while interrogating such - * {@link Future} the ACK loop will end. Messages that were not ACKed would - * be considered non-delivered and therefore could be resent at the later - * time. - * - * @param sendFutures - * list of {@link Future}s representing results of publishing to - * Kafka - * - * @param lastAckMessageIndex - * the index of the last ACKed message. It is important to - * provide the last ACKed message especially while re-trying so - * the proper index is maintained. - */ - private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) { - boolean exceptionThrown = false; - for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) { - Future<RecordMetadata> future = sendFutures.get(segmentCounter); - try { - future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); - lastAckMessageIndex++; - } catch (InterruptedException e) { - exceptionThrown = true; - Thread.currentThread().interrupt(); - this.warnOrError("Interrupted while waiting for acks from Kafka", null); - } catch (ExecutionException e) { - exceptionThrown = true; - this.warnOrError("Failed while waiting for acks from Kafka", e); - } catch (TimeoutException e) { - exceptionThrown = true; - this.warnOrError("Timed out while waiting for acks from Kafka", null); - } - } - - return lastAckMessageIndex; - } - - /** - * Will close the underlying {@link KafkaProducer} waiting if necessary for - * the same duration as supplied {@link #setAckWaitTime(long)} - */ - @Override - public void close() { - this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS); - } - - /** - * - */ - private void warnOrError(String message, Exception e) { - if (e == null) { - this.componentLog.warn(message); - } else { - this.componentLog.error(message, e); - } - } - - /** - * Encapsulates the result received from publishing messages to Kafka - */ - static class KafkaPublisherResult { - private final int messagesSent; - private final int lastMessageAcked; - KafkaPublisherResult(int messagesSent, int lastMessageAcked) { - this.messagesSent = messagesSent; - this.lastMessageAcked = lastMessageAcked; - } - - public int getMessagesSent() { - return this.messagesSent; - } - - public int getLastMessageAcked() { - return this.lastMessageAcked; - } - - public boolean isAllAcked() { - return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked; - } - - @Override - public String toString() { - return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java deleted file mode 100644 index 8c948df..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.util.Map; - -import org.apache.kafka.clients.producer.Partitioner; -import org.apache.kafka.common.Cluster; - -/** - * Collection of implementation of common Kafka {@link Partitioner}s. - */ -final public class Partitioners { - - private Partitioners() { - } - - /** - * {@link Partitioner} that implements 'round-robin' mechanism which evenly - * distributes load between all available partitions. - */ - public static class RoundRobinPartitioner implements Partitioner { - private volatile int index; - - @Override - public void configure(Map<String, ?> configs) { - // noop - } - - @Override - public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - return this.next(cluster.availablePartitionsForTopic(topic).size()); - } - - @Override - public void close() { - // noop - } - - private synchronized int next(int numberOfPartitions) { - if (this.index >= numberOfPartitions) { - this.index = 0; - } - return index++; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java deleted file mode 100644 index 6703c04..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult; -import org.apache.nifi.processors.kafka.pubsub.Partitioners.RoundRobinPartitioner; - -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line.") -@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." - + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." - + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") -public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> { - - protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id"; - - protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx"; - - protected static final String FAILED_TOPIC_ATTR = "failed.topic"; - - protected static final String FAILED_KEY_ATTR = "failed.key"; - - protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter"; - - protected static final String MSG_COUNT = "msg.count"; - - static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", - "FlowFile will be routed to failure unless the message is replicated to the appropriate " - + "number of Kafka Nodes according to the Topic configuration"); - static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", - "FlowFile will be routed to success if the message is received by a single Kafka node, " - + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> " - + "but can result in data loss if a Kafka node crashes"); - static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", - "FlowFile will be routed to success after successfully writing the content to a Kafka node, " - + "without waiting for a response. This provides the best performance but may result in data loss."); - - static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(RoundRobinPartitioner.class.getName(), - RoundRobinPartitioner.class.getSimpleName(), - "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " - + "the next Partition to Partition 2, and so on, wrapping as necessary."); - static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", - "DefaultPartitioner", "Messages will be assigned to random partitions."); - - static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name(ProducerConfig.ACKS_CONFIG) - .displayName("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); - static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder() - .name(ProducerConfig.MAX_BLOCK_MS_CONFIG) - .displayName("Meta Data Wait Time") - .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the " - + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(true) - .defaultValue("30 sec") - .build(); - static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("kafka-key") - .displayName("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - static final PropertyDescriptor MESSAGE_DEMARCATOR = MESSAGE_DEMARCATOR_BUILDER - .description("Specifies the string (interpreted as UTF-8) to use for demarcating apart multiple messages within " - + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the " - + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. " - + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.") - .build(); - static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder() - .name(ProducerConfig.PARTITIONER_CLASS_CONFIG) - .displayName("Partitioner class") - .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.") - .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING) - .defaultValue(RANDOM_PARTITIONING.getValue()) - .required(false) - .build(); - static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name(ProducerConfig.COMPRESSION_TYPE_CONFIG) - .displayName("Compression Type") - .description("This parameter allows you to specify the compression codec for all data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues("none", "gzip", "snappy", "lz4") - .defaultValue("none") - .build(); - - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); - - static final List<PropertyDescriptor> DESCRIPTORS; - - static final Set<Relationship> RELATIONSHIPS; - - private volatile String brokers; - - /* - * Will ensure that list of PropertyDescriptors is build only once, since - * all other lifecycle methods are invoked multiple times. - */ - static { - List<PropertyDescriptor> _descriptors = new ArrayList<>(); - _descriptors.addAll(SHARED_DESCRIPTORS); - _descriptors.add(DELIVERY_GUARANTEE); - _descriptors.add(KEY); - _descriptors.add(MESSAGE_DEMARCATOR); - _descriptors.add(META_WAIT_TIME); - _descriptors.add(PARTITION_CLASS); - _descriptors.add(COMPRESSION_CODEC); - - DESCRIPTORS = Collections.unmodifiableList(_descriptors); - - Set<Relationship> _relationships = new HashSet<>(); - _relationships.addAll(SHARED_RELATIONSHIPS); - _relationships.add(REL_FAILURE); - RELATIONSHIPS = Collections.unmodifiableSet(_relationships); - } - - /** - * - */ - @Override - public Set<Relationship> getRelationships() { - return RELATIONSHIPS; - } - - /** - * - */ - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return DESCRIPTORS; - } - - /** - * Will rendezvous with Kafka if {@link ProcessSession} contains {@link FlowFile} - * producing a result {@link FlowFile}. - * <br> - * The result {@link FlowFile} that is successful is then transfered to {@link #REL_SUCCESS} - * <br> - * The result {@link FlowFile} that is failed is then transfered to {@link #REL_FAILURE} - * - */ - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session){ - FlowFile flowFile = session.get(); - if (flowFile != null) { - long start = System.nanoTime(); - flowFile = this.doRendezvousWithKafka(flowFile, context, session); - Relationship relationship = REL_SUCCESS; - if (!this.isFailedFlowFile(flowFile)) { - String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - String transitUri = this.buildTransitURI(context.getProperty(SECURITY_PROTOCOL).getValue(), this.brokers, topic); - session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration); - this.getLogger().info("Successfully sent {} to Kafka as {} message(s) in {} millis", new Object[] { flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration }); - } else { - relationship = REL_FAILURE; - flowFile = session.penalize(flowFile); - } - session.transfer(flowFile, relationship); - } - return flowFile != null; - } - - /** - * Builds and instance of {@link KafkaPublisher}. - */ - @Override - protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) { - Properties kafkaProperties = this.buildKafkaProperties(context); - kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue(); - KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger()); - return publisher; - } - - /** - * Will rendezvous with {@link KafkaPublisher} after building - * {@link PublishingContext} and will produce the resulting {@link FlowFile}. - * The resulting FlowFile contains all required information to determine - * if message publishing originated from the provided FlowFile has actually - * succeeded fully, partially or failed completely (see - * {@link #isFailedFlowFile(FlowFile)}. - */ - private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) { - final AtomicReference<KafkaPublisherResult> publishResultRef = new AtomicReference<>(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(InputStream contentStream) throws IOException { - PublishingContext publishingContext = PublishKafka.this.buildPublishingContext(flowFile, context, contentStream); - KafkaPublisherResult result = PublishKafka.this.kafkaResource.publish(publishingContext); - publishResultRef.set(result); - } - }); - - FlowFile resultFile = publishResultRef.get().isAllAcked() - ? this.cleanUpFlowFileIfNecessary(flowFile, session) - : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context)); - - if (!this.isFailedFlowFile(resultFile)) { - resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent())); - } - return resultFile; - } - - /** - * Builds {@link PublishingContext} for message(s) to be sent to Kafka. - * {@link PublishingContext} contains all contextual information required by - * {@link KafkaPublisher} to publish to Kafka. Such information contains - * things like topic name, content stream, delimiter, key and last ACKed - * message for cases where provided FlowFile is being retried (failed in the - * past). - * <br> - * For the clean FlowFile (file that has been sent for the first time), - * PublishingContext will be built form {@link ProcessContext} associated - * with this invocation. - * <br> - * For the failed FlowFile, {@link PublishingContext} will be built from - * attributes of that FlowFile which by then will already contain required - * information (e.g., topic, key, delimiter etc.). This is required to - * ensure the affinity of the retry in the even where processor - * configuration has changed. However keep in mind that failed FlowFile is - * only considered a failed FlowFile if it is being re-processed by the same - * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see - * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to - * another PublishKafka processor it is treated as a fresh FlowFile - * regardless if it has #FAILED* attributes set. - */ - private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { - String topicName; - byte[] keyBytes; - byte[] delimiterBytes = null; - int lastAckedMessageIndex = -1; - if (this.isFailedFlowFile(flowFile)) { - lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); - topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null - ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; - delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null - ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - - } else { - topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) - .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; - } - - PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex); - publishingContext.setKeyBytes(keyBytes); - publishingContext.setDelimiterBytes(delimiterBytes); - return publishingContext; - } - - /** - * Will remove FAILED_* attributes if FlowFile is no longer considered a - * failed FlowFile - * - * @see #isFailedFlowFile(FlowFile) - */ - private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { - if (this.isFailedFlowFile(flowFile)) { - Set<String> keysToRemove = new HashSet<>(); - keysToRemove.add(FAILED_DELIMITER_ATTR); - keysToRemove.add(FAILED_KEY_ATTR); - keysToRemove.add(FAILED_TOPIC_ATTR); - keysToRemove.add(FAILED_PROC_ID_ATTR); - keysToRemove.add(FAILED_LAST_ACK_IDX); - flowFile = session.removeAllAttributes(flowFile, keysToRemove); - } - return flowFile; - } - - /** - * Builds a {@link Map} of FAILED_* attributes - * - * @see #FAILED_PROC_ID_ATTR - * @see #FAILED_LAST_ACK_IDX - * @see #FAILED_TOPIC_ATTR - * @see #FAILED_KEY_ATTR - * @see #FAILED_DELIMITER_ATTR - */ - private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) { - Map<String, String> attributes = new HashMap<>(); - attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier()); - attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex)); - attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue()); - attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet() - ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null); - return attributes; - } - - /** - * Returns 'true' if provided FlowFile is a failed FlowFile. A failed - * FlowFile contains {@link #FAILED_PROC_ID_ATTR}. - */ - private boolean isFailedFlowFile(FlowFile flowFile) { - return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR)); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java deleted file mode 100644 index bda29e6..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; - -/** - * Holder of context information used by {@link KafkaPublisher} required to - * publish messages to Kafka. - */ -class PublishingContext { - - private final InputStream contentStream; - - private final String topic; - - private final int lastAckedMessageIndex; - - /* - * We're using the default value from Kafka. We are using it to control the - * message size before it goes to to Kafka thus limiting possibility of a - * late failures in Kafka client. - */ - private int maxRequestSize = 1048576; // kafka default - - private boolean maxRequestSizeSet; - - private byte[] keyBytes; - - private byte[] delimiterBytes; - - PublishingContext(InputStream contentStream, String topic) { - this(contentStream, topic, -1); - } - - PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) { - this.validateInput(contentStream, topic, lastAckedMessageIndex); - this.contentStream = contentStream; - this.topic = topic; - this.lastAckedMessageIndex = lastAckedMessageIndex; - } - - @Override - public String toString() { - return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'"; - } - - int getLastAckedMessageIndex() { - return this.lastAckedMessageIndex; - } - - int getMaxRequestSize() { - return this.maxRequestSize; - } - - byte[] getKeyBytes() { - return this.keyBytes; - } - - byte[] getDelimiterBytes() { - return this.delimiterBytes; - } - - InputStream getContentStream() { - return this.contentStream; - } - - String getTopic() { - return this.topic; - } - - void setKeyBytes(byte[] keyBytes) { - if (this.keyBytes == null) { - if (keyBytes != null) { - this.assertBytesValid(keyBytes); - this.keyBytes = keyBytes; - } - } else { - throw new IllegalArgumentException("'keyBytes' can only be set once per instance"); - } - } - - void setDelimiterBytes(byte[] delimiterBytes) { - if (this.delimiterBytes == null) { - if (delimiterBytes != null) { - this.assertBytesValid(delimiterBytes); - this.delimiterBytes = delimiterBytes; - } - } else { - throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance"); - } - } - - void setMaxRequestSize(int maxRequestSize) { - if (!this.maxRequestSizeSet) { - if (maxRequestSize > 0) { - this.maxRequestSize = maxRequestSize; - this.maxRequestSizeSet = true; - } else { - throw new IllegalArgumentException("'maxRequestSize' must be > 0"); - } - } else { - throw new IllegalArgumentException("'maxRequestSize' can only be set once per instance"); - } - } - - private void assertBytesValid(byte[] bytes) { - if (bytes != null) { - if (bytes.length == 0) { - throw new IllegalArgumentException("'bytes' must not be empty"); - } - } - } - - private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) { - if (contentStream == null) { - throw new IllegalArgumentException("'contentStream' must not be null"); - } else if (topic == null || topic.trim().length() == 0) { - throw new IllegalArgumentException("'topic' must not be null or empty"); - } else if (lastAckedMessageIndex < -1) { - throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1"); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 28b8393..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.kafka.pubsub.PublishKafka -org.apache.nifi.processors.kafka.pubsub.ConsumeKafka \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html deleted file mode 100644 index 7f5cbf7..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/additionalDetails.html +++ /dev/null @@ -1,33 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>ConsumeKafka</title> - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p> - This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a> - for data using KafkaConsumer API available with Kafka 0.9+. When a message is received - from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value - of the Kafka message. - </p> - </body> -</html>
