Yes, happens everytime, I started spark job after starting NIFI as well.same
result.
here is my sample spark application
package com.dtcc.nifi;
import java.nio.charset.StandardCharsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.spark.NiFiDataPacket;
import org.apache.nifi.spark.NiFiReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class MyProcessor {
private final static Log log = LogFactory.getLog(MyProcessor.class);
public static void main(String args[]){
System.out.println("blah***");
printToDebugLogsAndConsole("Satarting ...", "starting ......",
log);
SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
.url("http://sxmn5:8080/nifi")
.portName("Data_For_Spark")
.buildConfig();
SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark
Streaming
example");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
new
Duration(1000L));
// Create a JavaReceiverInputDStream using a NiFi receiver so
that we can
pull data from
// specified Port
JavaReceiverInputDStream packetStream =
ssc.receiverStream(new NiFiReceiver(config,
StorageLevel.MEMORY_ONLY()));
// Map the data from NiFi to text, ignoring the attributes
JavaDStream text = packetStream.map(new
Function<NiFiDataPacket,
String>() {
@Override
public String call(NiFiDataPacket dataPacket) throws
Exception {
// TODO Auto-generated method stub
System.out.println("blah***");
printToDebugLogsAndConsole("blah###", new
String(dataPacket.getContent()), log);
return dataPacket.getAttributes().get("uuid");
}
});
text.print();
ssc.start();
ssc.awaitTermination(1000000000);// make this 0 eventually
}
public static void printToDebugLogsAndConsole(String description,
String msg,Log log) {
String lineBling = "******* ";
log.info(lineBling + "info: " + description + " : " + msg);
log.debug(lineBling + "debug: " + description + " : " + msg);
System.out.println(lineBling + "sysout: " + description + " : "
+ msg);
}
public static void printToErrorLogsAndConsole(String description,
String msg,Log log) {
String lineBling = "$$$$$$$ ";
log.error(lineBling + "error: " + description + " : " + msg);
System.out.println(lineBling + "sysout: " + description + " : "
+ msg);
}
}
--
View this message in context:
http://apache-nifi-developer-list.39713.n7.nabble.com/site-to-site-communication-error-on-output-port-tp10698p10709.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.