Joe, Replies below.
Rick -----Original Message----- From: Joe Witt [mailto:joe.w...@gmail.com] Sent: Saturday, September 12, 2015 7:02 AM To: dev@nifi.apache.org Subject: Re: Transfer relationship not specified (FlowFileHandlingException) Rick Can you show what is happening in the exception handling part of your code as well? Yes. This version has sending (empty) flowfiles for directory entries by-passed so only files get processed, and also has attributes disabled (which did not help). Session.commit() is throwing the exception - no other errors or issues from session.importFrom() or session.transfer(). Here's the entire onTrigger method: @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); final ProcessorLog logger = getLogger(); final int queueMax = context.getProperty(PROCESS_QUEUE_SIZE).asInteger(); if (fileQueue.size() < queueMax && filelistLock.tryLock()) { try { final Set<File> filelist = getFileList(context, session); queueLock.lock(); try { filelist.removeAll(inProcess); if (!keepingSourceFile) { filelist.removeAll(recentlyProcessed); } fileQueue.clear(); fileQueue.addAll(filelist); queueLastUpdated.set(System.currentTimeMillis()); recentlyProcessed.clear(); if (filelist.isEmpty()) { context.yield(); } } finally { queueLock.unlock(); } } finally { filelistLock.unlock(); } } final int batchSize = context.getProperty(PROCESS_BATCH_SIZE).asInteger(); final List<File> files = new ArrayList<>(batchSize); queueLock.lock(); try { fileQueue.drainTo(files, batchSize); if (files.isEmpty()) { return; } else { inProcess.addAll(files); } } finally { queueLock.unlock(); } final ListIterator<File> itr = files.listIterator(); FlowFile flowFile = null; try { while (itr.hasNext()) { final File file = itr.next(); final Path filePath = file.toPath(); final Path relativePath = filePath.relativize(filePath.getParent()); String relativePathString = relativePath.toString() + "/"; if (relativePathString.isEmpty()) { relativePathString = "./"; } final Path absPath = filePath.toAbsolutePath(); final String absPathString = absPath.getParent().toString() + "/"; final long importStart = System.nanoTime(); String fileType = "directory"; if (file.isFile()){ fileType = "file"; flowFile = session.create(); flowFile = session.importFrom(filePath, keepingSourceFile, flowFile); } else { logger.info("skipping directory {} and not placing into output flow", new Object[]{file}); continue; // ******* SKIP DIRECTORIES FOR NOW **** } final long importNanos = System.nanoTime() - importStart; final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); // flowFile = session.putAttribute(flowFile, "file_type", fileType); // directory or file // flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName()); // flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString); // flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); // Map<String, String> attributes = getAttributesFromFile(filePath); // if (attributes.size() > 0) { // flowFile = session.putAllAttributes(flowFile, attributes); // } final String fileURI = file.toURI().toString(); session.getProvenanceReporter().receive(flowFile, fileURI, importMillis); session.transfer(flowFile, REL_SUCCESS); logger.info("added {} to flow", new Object[]{flowFile}); if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue. queueLock.lock(); try { while (itr.hasNext()) { final File nextFile = itr.next(); fileQueue.add(nextFile); inProcess.remove(nextFile); } } finally { queueLock.unlock(); } } } session.commit(); } catch (final Exception e) { logger.error("Failed to transfer files due to {}", e); context.yield(); // anything that we've not already processed needs to be put back on the queue if (flowFile != null) { session.remove(flowFile); } } finally { queueLock.lock(); try { inProcess.removeAll(files); recentlyProcessed.addAll(files); } finally { queueLock.unlock(); } } } } Also please confirm which codebase you're running against. Latest HEAD of master? I'm using a snap from GitHub that's several weeks old from August 25th (it's working fine with the original GetFile processor, which this code was derived from) Thanks Joe On Sat, Sep 12, 2015 at 7:21 AM, Rick Braddy <rbra...@softnas.com> wrote: > So the "transfer relationship not specified" occurs down in the Provenance > processing, where it checks to see if there are flowfile records associated > with the session/relationship. > > There are. When I inspect session.flowfilesout it's equal to 6, which is the > correct number of calls to importFrom and transfer(), so this confirms that > transfer() is called and did record the outbound flowfiles, yet when the > provenance subsystem looks for these records it does not find them. > > Not being intimate with the internals of the framework yet, not sure what > would case this. > > Rick > > -----Original Message----- > From: Rick Braddy > Sent: Friday, September 11, 2015 8:26 PM > To: dev@nifi.apache.org > Subject: RE: Transfer relationship not specified > (FlowFileHandlingException) > > Mark, > > The interesting thing is that session.transfer() is being called, as I have > stepped through it in the debugger. I'm only calling importFrom() for actual > files (not directories), as shown below. This is a modified version of > GetFile processor. > > Rick > > final ListIterator<File> itr = files.listIterator(); > FlowFile flowFile = null; > try { > while (itr.hasNext()) { > final File file = itr.next(); > final Path filePath = file.toPath(); > final Path relativePath = > filePath.relativize(filePath.getParent()); > String relativePathString = relativePath.toString() + "/"; > if (relativePathString.isEmpty()) { > relativePathString = "./"; > } > final Path absPath = filePath.toAbsolutePath(); > final String absPathString = > absPath.getParent().toString() + "/"; > > final long importStart = System.nanoTime(); > String fileType = "directory"; > flowFile = session.create(); > if (file.isFile()){ > fileType = "file"; > flowFile = session.importFrom(filePath, > keepingSourceFile, flowFile); > } > final long importNanos = System.nanoTime() - importStart; > final long importMillis = > TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); > > flowFile = session.putAttribute(flowFile, "file_type", > fileType); // directory or file > flowFile = session.putAttribute(flowFile, > CoreAttributes.FILENAME.key(), file.getName()); > flowFile = session.putAttribute(flowFile, > CoreAttributes.PATH.key(), relativePathString); > flowFile = session.putAttribute(flowFile, > CoreAttributes.ABSOLUTE_PATH.key(), absPathString); > Map<String, String> attributes = > getAttributesFromFile(filePath); > if (attributes.size() > 0) { > flowFile = session.putAllAttributes(flowFile, attributes); > } > > session.getProvenanceReporter().receive(flowFile, > file.toURI().toString(), importMillis); > session.transfer(flowFile, REL_SUCCESS); > logger.info("added {} to flow", new > Object[]{flowFile}); > > if (!isScheduled()) { // if processor stopped, put the rest > of the files back on the queue. > queueLock.lock(); > try { > while (itr.hasNext()) { > final File nextFile = itr.next(); > fileQueue.add(nextFile); > inProcess.remove(nextFile); > } > } finally { > queueLock.unlock(); > } > } > } > session.commit(); > } catch (final Exception e) { > logger.error("Failed to transfer files due to {}", e); > > -----Original Message----- > From: Mark Payne [mailto:marka...@hotmail.com] > Sent: Friday, September 11, 2015 6:39 PM > To: dev@nifi.apache.org > Subject: RE: Transfer relationship not specified > (FlowFileHandlingException) > > Rick, > This error message isn't indicating that there's no Connection for the > Relationship, but rather than the FlowFile was never transferred. > I.e., there was never a call to session.transfer() for that FlowFile. > Thanks-Mark > >> From: rbra...@softnas.com >> To: dev@nifi.apache.org >> Subject: RE: Transfer relationship not specified >> (FlowFileHandlingException) >> Date: Fri, 11 Sep 2015 23:25:33 +0000 >> >> Some more details: >> >> 2015-09-11 18:23:23,743 ERROR [Timer-Driven Process Thread-3] >> c.s.c.processors.files.GetFileData >> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] Failed to >> process session due to >> org.apache.nifi.processor.exception.FlowFileHandlingException: >> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai >> m >> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1 >> e a005,offset=0,name=printargs.c,size=190] is not known in this >> session >> (StandardProcessSession[id=6967]): >> org.apache.nifi.processor.exception.FlowFileHandlingException: >> StandardFlowFileRecord[uuid=58429f42-cd3c-481a-83ea-86d058de3515,clai >> m >> =org.apache.nifi.controller.repository.claim.StandardContentClaim@fe1 >> e a005,offset=0,name=printargs.c,size=190] is not known in this >> session >> (StandardProcessSession[id=6967]) >> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] >> c.s.c.processors.files.GetFileData >> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added >> StandardFlowFileRecord[uuid=a9e8b8e6-1f27-4fbd-b7c4-7bf4be9ec444,clai >> m >> =org.apache.nifi.controller.repository.claim.StandardContentClaim@ff0 >> c ad6b,offset=0,name=anImage.png,size=16418] to flow >> 2015-09-11 18:23:23,744 INFO [Timer-Driven Process Thread-3] >> c.s.c.processors.files.GetFileData >> GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0] added >> StandardFlowFileRecord[uuid=a324aaff-a340-499d-9904-2421b2bfc4a8,claim=,offset=0,name=in,size=0] >> to flow ... >> >> So it seems there's some issue with each of the FlowFiles... >> >> -----Original Message----- >> From: Rick Braddy [mailto:rbra...@softnas.com] >> Sent: Friday, September 11, 2015 6:00 PM >> To: dev@nifi.apache.org >> Subject: Transfer relationship not specified >> (FlowFileHandlingException) >> >> Hi, >> >> I have a processor that appears to be creating FlowFiles correctly (modified >> a standard processor), but when it goes to commit() the session, an >> exception is raised: >> >> 2015-09-11 17:37:24,690 ERROR [Timer-Driven Process Thread-6] >> c.s.c.processors.files.GetFileData >> [GetFileData[id=8f5e644d-591c-4df1-8c79-feea118bd8c0]] Failed to >> retrieve files due to {} >> org.apache.nifi.processor.exception.FlowFileHandlingException: >> StandardFlowFileRecord[uuid=7ec0f931-6fdb-4adb-829d-80d564bd3d31,clai >> m >> =org.apache.nifi.controller.repository.claim.StandardContentClaim@939 >> 6 4c66,offset=244,name=225120878343804,size=42] transfer relationship >> not specified >> >> I'm assuming this is supposed to be indicating there's no connection >> available to commit the transfer; however, there is a "success" relationship >> registered during init() in same way as original processor did it, and the >> success relationship out is connected to another processor input as it >> should be. >> >> Any suggestions for troubleshooting? >> >> Rick >> >> >