markap14 commented on a change in pull request #4730:
URL: https://github.com/apache/nifi/pull/4730#discussion_r561112193



##########
File path: 
nifi-external/nifi-kafka-connect/nifi-kafka-connector/src/main/java/org/apache/nifi/kafka/connect/StatelessNiFiSourceTask.java
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.kafka.connect;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stateless.flow.DataflowTrigger;
+import org.apache.nifi.stateless.flow.StatelessDataflow;
+import org.apache.nifi.stateless.flow.TriggerResult;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+public class StatelessNiFiSourceTask extends SourceTask {
+    public static final String STATE_MAP_KEY = "task.index";
+    private static final Logger logger = 
LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
+
+    private StatelessDataflow dataflow;
+    private String outputPortName;
+    private String topicName;
+    private String topicNameAttribute;
+    private TriggerResult triggerResult;
+    private String keyAttributeName;
+    private Pattern headerAttributeNamePattern;
+    private long timeoutMillis;
+    private String dataflowName;
+    private long failureYieldExpiration = 0L;
+
+    private final Map<String, String> clusterStatePartitionMap = 
Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
+    private Map<String, String> localStatePartitionMap = new HashMap<>();
+    private boolean primaryNodeOnly;
+    private boolean primaryNodeTask;
+
+    private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
+
+    @Override
+    public String version() {
+        return StatelessKafkaConnectorUtil.getVersion();
+    }
+
+    @Override
+    public void start(final Map<String, String> properties) {
+        logger.info("Starting Source Task with properties {}", 
StatelessKafkaConnectorUtil.getLoggableProperties(properties));
+
+        final String timeout = 
properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, 
StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
+        timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, 
TimeUnit.MILLISECONDS);
+
+        topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
+        topicNameAttribute = 
properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
+        keyAttributeName = 
properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
+
+        if (topicName == null && topicNameAttribute == null) {
+            throw new ConfigException("Either the topic.name or 
topic.name.attribute configuration must be specified");
+        }
+
+        final String headerRegex = 
properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
+        headerAttributeNamePattern = headerRegex == null ? null : 
Pattern.compile(headerRegex);
+
+        dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
+        primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
+
+        // Determine the name of the Output Port to retrieve data from
+        dataflowName = 
properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
+        outputPortName = 
properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
+        if (outputPortName == null) {
+            final Set<String> outputPorts = dataflow.getOutputPortNames();
+            if (outputPorts.isEmpty()) {
+                throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> does not have an Output Port at the root level. Dataflows 
used for a Kafka Connect Source Task "
+                    + "must have at least one Output Port at the root level.");
+            }
+
+            if (outputPorts.size() > 1) {
+                throw new ConfigException("The dataflow specified for <" + 
dataflowName + "> has multiple Output Ports at the root level (" + 
outputPorts.toString()
+                    + "). The " + 
StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to 
indicate which of these Ports Kafka records should be retrieved from.");
+            }
+
+            outputPortName = outputPorts.iterator().next();
+        }
+
+        final String taskIndex = properties.get(STATE_MAP_KEY);
+        localStatePartitionMap.put(STATE_MAP_KEY, taskIndex);
+        primaryNodeTask = "0".equals(taskIndex);
+
+        if (primaryNodeOnly && !primaryNodeTask) {
+            logger.warn("Configured Dataflow ({}) requires that the source be 
run only on the Primary Node, but the Connector is configured with more than 
one task. The dataflow will only be run by" +

Review comment:
       Ah it didn't occur to me that that's an option - to create fewer task 
configs than `maxTasks`. Will do. Great catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to