Running Mac OS X 10.10.5
Apache Maven 3.3.9
java version "1.8.0_72"
Java(TM) SE Runtime Environment (build 1.8.0_72-b15)
I've been trying to figure out how to use the GetMongo processor to output
to a PutHDFS processor.
Some things I think I've figured out:
*Limit* acts exactly as .limit() for Mongo, where all it does it give you
the first *n* elements in a collections.
*Batch* isn't a command in Mongo (that I know of) and I can't see how this
entry does anything for the processor.
I'm working with a collection in the millions and I can't just simply leave
the limit blank because the JVM runs out of memory. I tried to write my own
processor and got it to compile under the *mvn clean install*, but when I
copy the .nar file from the '...nar/target' directory to the
'nifi-0.6.0/lib' folder and then try to 'sh nifi.sh run' or 'start', to nifi
refuses to finish booting up and terminates itself.
Taking GetMongo.java
<https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java>
and it's respective other files. I modified them and changed the following
method:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final ProcessorLog logger = getLogger();
final MongoCollection<Document> collection = getCollection(context);
int count = (int)collection.count();
int next = context.getProperty(BATCH_SIZE).asInteger();
int current = next;
while(count >= current){
try {
final FindIterable<Document> it =
collection.find().skip(current).limit(context.getProperty(LIMIT).asInteger());
final MongoCursor<Document> cursor = it.iterator();
try {
FlowFile flowFile = null;
while (cursor.hasNext()) {
flowFile = session.create();
flowFile = session.write(flowFile, new
OutputStreamCallback() {
@Override
public void process(OutputStream out) throws
IOException {
IOUtils.write(cursor.next().toJson(), out);
}
});
session.getProvenanceReporter().receive(flowFile,
context.getProperty(URI).getValue());
session.transfer(flowFile, REL_SUCCESS);
}
session.commit();
} finally {
cursor.close();
}
} catch (final RuntimeException e) {
context.yield();
session.rollback();
}
current = current + next;
}
}
I also modified the test and abstracts so Maven would compile.
Any thoughts?
I'm trying to make a processor that can traverse over an entire collection
in the millions; and later /any/ size.
If anyone has already made one and can share, that'd be great too! Thanks!
--
View this message in context:
http://apache-nifi-developer-list.39713.n7.nabble.com/GetMongo-GC-Overflow-tp7729.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.