I want to reitreve data from folder and then update it and put it back to
its' destination and i want to make this operation many times , here is my
code i try to getfile and update it , but it doesn't roll back the file and
can't reitreve data with the same filename(sometimes when i start this
processor first time it reireives data and then rolls back updated one but
then it remembers state or flowfile information i guess and doen't reitreive
same updated file) , can someone help me what should i change to make this
code work?
here is error i got :2017-10-08 21:40:55,959 ERROR [Timer-Driven Process
Thread-9] Reader.MyProcessor
MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67]
MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67] failed to process due
to java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are
created in this Session back to self; rolling back session: {}
java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are
created in this Session back to self at
org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1848)
at Reader.MyProcessor.onTrigger(MyProcessor.java:732)
public class MyProcessor extends AbstractProcessor {
public String start, startDate, endDate, makeVersion, runAs, patch;
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final StopWatch stopWatch = new StopWatch(true);
final File directory = new
File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
final boolean keepingSourceFile =
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
final Integer maxDestinationFiles = 30;
final ComponentLog logger = getLogger();
if (fileQueue.size() < 100) {
final long pollingMillis =
context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
if ((queueLastUpdated.get() < System.currentTimeMillis() -
pollingMillis) && listingLock.tryLock()) {
try {
final ArrayList<File> listing =
performListing(directory, fileFilterRef.get(),
context.getProperty(RECURSE).asBoolean().booleanValue());
queueLock.lock();
try {
listing.removeAll(inProcess);
if (!keepingSourceFile) {
listing.removeAll(recentlyProcessed);
}
fileQueue.clear();
fileQueue.addAll(listing);
queueLastUpdated.set(System.currentTimeMillis());
recentlyProcessed.clear();
if (listing.isEmpty()) {
context.yield();
}
} finally {
queueLock.unlock();
}
} finally {
listingLock.unlock();
}
}
}
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final List<File> files = new ArrayList<>(batchSize);
queueLock.lock();
try {
fileQueue.drainTo(files, batchSize);
if (files.isEmpty()) {
return;
} else {
inProcess.addAll(files);
}
} finally {
queueLock.unlock();
}
//make xml parsing
DocumentBuilderFactory dbFactory =
DocumentBuilderFactory.newInstance();
try {
dBuilder = dbFactory.newDocumentBuilder();
} catch (ParserConfigurationException e) {
e.printStackTrace();
}
try {
File f = files.get(0);
doc = dBuilder.parse(f);
} catch (IOException e) {
e.printStackTrace();
} catch (org.xml.sax.SAXException e) {
e.printStackTrace();
}
NodeList nList = doc.getElementsByTagName("localAttributes");
for (int temp = 0; temp < nList.getLength(); temp++) {
Node nNode = nList.item(temp);
if (nNode.getNodeType() == Node.ELEMENT_NODE) {
Element eElement = (Element) nNode;
start =
eElement.getElementsByTagName("start").item(0).getTextContent();
startDate =
eElement.getElementsByTagName("startDate").item(0).getTextContent();
endDate =
eElement.getElementsByTagName("endDate").item(0).getTextContent();
patch =
eElement.getElementsByTagName("patch").item(0).getTextContent();
runAs =
eElement.getElementsByTagName("runAs").item(0).getTextContent();
makeVersion =
eElement.getElementsByTagName("makeVersion").item(0).getTextContent();
}
}
final ListIterator<File> itr = files.listIterator();
FlowFile flowFile = null;
try {
final Path directoryPath = directory.toPath();
while (itr.hasNext()) {
final File file = itr.next();
final Path filePath = file.toPath();
final Path relativePath =
directoryPath.relativize(filePath.getParent());
String relativePathString = relativePath.toString() + "/";
if (relativePathString.isEmpty()) {
relativePathString = "./";
}
final Path absPath = filePath.toAbsolutePath();
final String absPathString = absPath.getParent().toString()
+ "/";
flowFile = session.create();
final long importStart = System.nanoTime();
flowFile = session.importFrom(filePath, keepingSourceFile,
flowFile);
final long importNanos = System.nanoTime() - importStart;
final long importMillis =
TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);
flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), file.getName());
flowFile = session.putAttribute(flowFile,
CoreAttributes.PATH.key(), relativePathString);
flowFile = session.putAttribute(flowFile,
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
Map<String, String> attributes =
getAttributesFromFile(filePath);
if (attributes.size() > 0) {
flowFile = session.putAllAttributes(flowFile,
attributes);
}
FlowFile flowFile1 = session.create();
flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.FILENAME.key(), file.getName());
flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.PATH.key(), relativePathString);
flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
flowFile1 = session.putAttribute(flowFile1, "start", start);
flowFile1 = session.putAttribute(flowFile1, "startDate",
startDate);
flowFile1 = session.putAttribute(flowFile1, "endDate",
endDate);
flowFile1 = session.putAttribute(flowFile1, "runAs", runAs);
flowFile1 = session.putAttribute(flowFile1, "patch", patch);
flowFile1 = session.putAttribute(flowFile1, "makeVersion",
makeVersion);
flowFile1 = session.putAttribute(flowFile1, "filename",
"Configuration");
//session.getProvenanceReporter().receive(flowFile1,
file.toURI().toString(), importMillis);
InputStream ffStream = session.read(flowFile);
DocumentBuilderFactory builderFactory =
DocumentBuilderFactory.newInstance();
DocumentBuilder builder =
builderFactory.newDocumentBuilder();
Document xmlDocument = builder.parse(ffStream);
XPath xPath = XPathFactory.newInstance().newXPath();
XPathExpression myNodeList = (XPathExpression)
xPath.compile("/localAttributes");
Node nodeGettingChanged = (Node)
myNodeList.evaluate(xmlDocument, XPathConstants.NODE);
NodeList childNodes = nodeGettingChanged.getChildNodes();
for (int i = 0; i != childNodes.getLength(); ++i) {
Node child = childNodes.item(i);
if (!(child instanceof Element))
continue;
if (child.getNodeName().equals("runAs"))
child.getFirstChild().setNodeValue("false");
}
TransformerFactory transformerFactory =
TransformerFactory.newInstance();
Transformer transformer = null;
transformer = transformerFactory.newTransformer();
DOMSource source = new DOMSource(xmlDocument);
String path =
"C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1//conf.xml";
File f = new File(path);
StreamResult file1 = new StreamResult(f);
try {
transformer.transform(source, file1);
} catch (TransformerException e) {
e.printStackTrace();
}
session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream inputStream,
OutputStream outputStream) throws IOException {
TransformerFactory transformerFactory =
TransformerFactory.newInstance();
Transformer transformer = null;
try {
transformer =
transformerFactory.newTransformer();
} catch (TransformerConfigurationException e) {
e.printStackTrace();
}
DOMSource source = new DOMSource(xmlDocument);
ffStream.close();
ByteArrayOutputStream bos = new
ByteArrayOutputStream();
StreamResult result = new StreamResult(bos);
try {
transformer.transform(source, result);
} catch (TransformerException e) {
e.printStackTrace();
}
byte[] array = bos.toByteArray();
outputStream.write(array);
}
});
Path tempDotCopyFile = null;
try {
final Path rootDirPath =
Paths.get("C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1");
final Path tempCopyFile = rootDirPath.resolve("." +
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
final Path copyFile =
rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));
if (!Files.exists(rootDirPath)) {
if (context.getProperty(CREATE_DIRS).asBoolean()) {
Files.createDirectories(rootDirPath);
} else {
flowFile = session.penalize(flowFile);
session.transfer(flowFile);
logger.error("Penalizing {} and routing to
'failure' because the output directory {} does not exist and Processor is "
+ "configured not to create missing
directories", new Object[]{flowFile1, rootDirPath});
return;
}
}
final Path dotCopyFile = tempCopyFile;
tempDotCopyFile = dotCopyFile;
Path finalCopyFile = copyFile;
final Path finalCopyFileDir = finalCopyFile.getParent();
if (Files.exists(finalCopyFileDir) &&
maxDestinationFiles != null) { // check if too many files already
final int numFiles =
finalCopyFileDir.toFile().list().length;
if (numFiles >= maxDestinationFiles) {
flowFile= session.penalize(flowFile);
logger.warn("Penalizing {} and routing to
'failure' because the output directory {} has {} files, which exceeds the "
+ "configured maximum number of files",
new Object[]{flowFile, finalCopyFileDir, numFiles});
session.transfer(flowFile);
return;
}
}
if (Files.exists(finalCopyFile)) {
switch (conflictResponse) {
case REPLACE_RESOLUTION:
Files.delete(finalCopyFile);
logger.info("Deleted {} as configured in
order to replace with the contents of {}", new Object[]{finalCopyFile,
flowFile});
break;
case IGNORE_RESOLUTION:
session.transfer(flowFile, REL_SUCCESS);
logger.info("Transferring {} to success
because file with same name already exists", new Object[]{flowFile});
return;
case FAIL_RESOLUTION:
flowFile = session.penalize(flowFile);
logger.warn("Penalizing {} and routing to
failure as configured because file with the same name already exists", new
Object[]{flowFile});
session.transfer(flowFile);
return;
default:
break;
}
}
session.exportTo(flowFile, dotCopyFile, false);
final String permissions =
context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile1).getValue();
if (permissions != null &&
!permissions.trim().isEmpty()) {
try {
String perms = stringPermissions(permissions);
if (!perms.isEmpty()) {
Files.setPosixFilePermissions(dotCopyFile,
PosixFilePermissions.fromString(perms));
}
} catch (Exception e) {
logger.warn("Could not set file permissions to
{} because {}", new Object[]{permissions, e});
}
}
final String owner =
context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile1).getValue();
if (owner != null && !owner.trim().isEmpty()) {
try {
UserPrincipalLookupService lookupService =
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
Files.setOwner(dotCopyFile,
lookupService.lookupPrincipalByName(owner));
} catch (Exception e) {
logger.warn("Could not set file owner to {}
because {}", new Object[]{owner, e});
}
}
final String group =
context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile1).getValue();
if (group != null && !group.trim().isEmpty()) {
try {
UserPrincipalLookupService lookupService =
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
PosixFileAttributeView view =
Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
view.setGroup(lookupService.lookupPrincipalByGroupName(group));
} catch (Exception e) {
logger.warn("Could not set file group to {}
because {}", new Object[]{group, e});
}
}
boolean renamed = false;
for (int i = 0; i < 10; i++) { // try rename up to 10
times.
if
(dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
renamed = true;
break;// rename was successful
}
Thread.sleep(100L);// try waiting a few ms to let
whatever might cause rename failure to resolve
}
if (!renamed) {
if (Files.exists(dotCopyFile) &&
dotCopyFile.toFile().delete()) {
logger.debug("Deleted dot copy file {}", new
Object[]{dotCopyFile});
}
throw new ProcessException("Could not rename: " +
dotCopyFile);
} else {
logger.info("Produced copy of {} at location {}",
new Object[]{flowFile1, finalCopyFile});
}
/*session.getProvenanceReporter().send(flowFile,
finalCopyFile.toFile().toURI().toString(),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);*/
session.getProvenanceReporter().receive(flowFile1,
file.toURI().toString(), importMillis);
session.transfer(flowFile1, REL_SUCCESS);
session.remove(flowFile);
} catch (final Throwable t) {
if (tempDotCopyFile != null) {
try {
Files.deleteIfExists(tempDotCopyFile);
} catch (final Exception e) {
logger.error("Unable to remove temporary file {}
due to {}", new Object[]{tempDotCopyFile, e});
}
}
flowFile = session.penalize(flowFile1);
logger.error("Penalizing {} and transferring to failure
due to {}", new Object[]{flowFile1, t});
session.transfer(flowFile1);
}
}
if (!isScheduled()) { // if processor stopped, put the rest of
the files back on the queue.
queueLock.lock();
try {
while (itr.hasNext()) {
final File nextFile = itr.next();
fileQueue.add(nextFile);
inProcess.remove(nextFile);
}
} finally {
queueLock.unlock();
}
}
} catch (IOException e1) {
e1.printStackTrace();
} catch (TransformerConfigurationException e1) {
e1.printStackTrace();
} catch (ParserConfigurationException e1) {
e1.printStackTrace();
} catch (SAXException e1) {
e1.printStackTrace();
} catch (XPathExpressionException e1) {
e1.printStackTrace();
}
session.commit();
}
protected String stringPermissions(String perms) {
String permissions = "";
final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
final Pattern numPattern = Pattern.compile("\\d+");
if (rwxPattern.matcher(perms).matches()) {
permissions = perms;
} else if (numPattern.matcher(perms).matches()) {
try {
int number = Integer.parseInt(perms, 8);
StringBuilder permBuilder = new StringBuilder();
if ((number & 0x100) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x80) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if ((number & 0x40) > 0) {
permBuilder.append('x');
} else {
permBuilder.append('-');
}
if ((number & 0x20) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x10) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if ((number & 0x8) > 0) {
permBuilder.append('x');
} else {
permBuilder.append('-');
}
if ((number & 0x4) > 0) {
permBuilder.append('r');
} else {
permBuilder.append('-');
}
if ((number & 0x2) > 0) {
permBuilder.append('w');
} else {
permBuilder.append('-');
}
if ((number & 0x8) > 0) {
permBuilder.append('x');
} else {
permBuilder.append('-');
}
permissions = permBuilder.toString();
} catch (NumberFormatException ignore) {
}
}
return permissions;
}
}
--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/