[
https://issues.apache.org/jira/browse/NIFI-1778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15246809#comment-15246809
]
ASF GitHub Bot commented on NIFI-1778:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/361#discussion_r60155646
--- Diff:
nifi-external/nifi-storm-spout/src/main/java/org/apache/nifi/storm/NiFiBolt.java
---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.storm;
+
+import backtype.storm.Config;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A Storm bolt that can send tuples back to NiFi. This bolt provides a
micro-batching approach for higher
+ * through put scenarios. The bolt will queue tuples until the number of
tuples reaches the provided batch size, or
+ * until the provided batch interval in seconds has been exceeded. Setting
the batch size to 1 will send each tuple
+ * immediately in a single transaction.
+ */
+public class NiFiBolt extends BaseRichBolt {
+
+ private static final long serialVersionUID = 3067274587595578836L;
+ public static final Logger LOGGER =
LoggerFactory.getLogger(NiFiBolt.class);
+
+ private final SiteToSiteClientConfig clientConfig;
+ private final NiFiDataPacketBuilder builder;
+ private final int tickFrequencySeconds;
+
+ private SiteToSiteClient client;
+ private OutputCollector collector;
+ private BlockingQueue<Tuple> queue = new LinkedBlockingQueue<>();
+
+ private int batchSize = 10;
+ private int batchIntervalInSec = 10;
+ private long lastBatchProcessTimeSeconds = 0;
--- End diff --
Any chance these attributes will be modified/accessed by multiple threads?
Just wondering if we should make them volatile.
> Create a Storm bolt to push back to NiFi
> ----------------------------------------
>
> Key: NIFI-1778
> URL: https://issues.apache.org/jira/browse/NIFI-1778
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 0.7.0
>
>
> We have a NiFiSpout for Storm to pull data from NiFi. We should also provide
> a NiFiBolt and a NiFiDataPacketBuilder that converts Tuples to DataPackets
> and sends them back to NiFi.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)