Hello, Here is an example of how a test that uses LocalMode can be shutdown gracefully.
https://github.com/chandnisingh/Malhar/blob/examples/library/src/test/java/com/datatorrent/lib/io/fs/ApplicationTest.java Thanks, Chandni On Thu, Sep 3, 2015 at 1:40 PM, Chetan Narsude <[email protected]> wrote: > Hi Ilya, > > It looks like the input operator is taking too long to do finish > emitTuples. You can look at StreamingContainer.undeploy call to see what's > happening soon after lc.shutdown is called. From the error it looks like > the input operator is so busy emitting that it's even ignoring the > interrupt. > > Perhaps you can do emitTuples at smaller batches but if you are expecting > that your app shuts itself down completely gracefully in 15 secs lest it's > an error condition, the best approach is to have your input operators > raise the ShutdownException or simply call BaseOperator.shutdown(). > > Input operator shuts itself down when it raises ShutdownException. Other > operators are shutdown when all upstream operators feeding to it have shut > themselves down. The app shuts itself down when the last active operator > shuts itself down. > > -- > Chetan > > On Thu, Sep 3, 2015 at 1:14 PM, Thomas Weise <[email protected]> > wrote: > > > Ilya, > > > > In your code there is a hard stop after 15s. There are other options to > do > > this: > > > > - The application itself exits when it is done, that is if the input > > operator raises the ShutdownException which leads to graceful > > termination. > > - In your test code, let the cluster run asynchronously, check for > > existence of the expected output (file for example) and call shutdown > > when > > the file is complete. > > > > Thomas > > > > On Thu, Sep 3, 2015 at 12:36 PM, Ganelin, Ilya < > > [email protected]> > > wrote: > > > > > Hello all – I’m using the following code to execute a topology locally: > > > > > > try { > > > LocalMode lma = LocalMode.newInstance(); > > > Configuration conf = new Configuration(false); > > > > > > > > > conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); > > > lma.prepareDAG(new Application(), conf); > > > LocalMode.Controller lc = lma.getController(); > > > lc.run(15000); // runs for 10 seconds and quits > > > lc.shutdown(); > > > } catch (ConstraintViolationException e) { > > > Assert.fail("constraint violations: " + > e.getConstraintViolations()); > > > } > > > > > > I am reading from HDFS, operating on the data, and writing it back to > > > HDFS. The HDFS operator implementations extend > AbstractfileInputOperator > > > and AbstractfileOutputOperator respectively. > > > > > > My issue is that when the timer ends, my operators fail > catastrophically, > > > without completing their current operations (e.g. if reading/writing > from > > > HDFS they don’t close the file stream). Is there a way for this to > close > > > gracefully? Is this an issue with the implementations of the operators > or > > > with the way that local topologies are executed? > > > > > > Error stacks below: > > > > > > java.util.concurrent.RejectedExecutionException: Task > > > java.util.concurrent.FutureTask@3848436c rejected from > > > java.util.concurrent.ThreadPoolExecutor@770deac6[Terminated, pool > size = > > > 0, active threads = 0, queued tasks = 0, completed tasks = 20121] > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) > > > at > > > > > > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) > > > at > > > > > > java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) > > > at > > > > > > java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678) > > > at > > com.datatorrent.bufferserver.internal.DataList.flush(DataList.java:226) > > > at > > > > > > com.datatorrent.bufferserver.server.Server$Publisher.read(Server.java:642) > > > at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:117) > > > at > > > > > > com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:295) > > > at > > > > > > com.datatorrent.netlet.DefaultEventLoop.runEventLoop(DefaultEventLoop.java:252) > > > at > com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:100) > > > at java.lang.Thread.run(Thread.java:745) > > > 2015-09-03 12:23:20,545 [2/RecordMaker I:RecordMaker] DEBUG > > > engine.StreamingContainer teardownNode - deactivated 2 > > > 2015-09-03 12:23:20,544 [IPC Parameter Sending Thread #0] DEBUG > > ipc.Client > > > run - IPC Client (467473545) connection to > > > mdcilabpen01.kdc.capitalone.com/10.24.28.46:8020 from zjb238 sending > #14 > > > 2015-09-03 12:23:20,543 [9/HdfsOutHdht:HdfsFileOutputOperator] DEBUG > > > hdfs.DFSClient writeChunkImpl - DFSClient writeChunk allocating new > > packet > > > seqno=0, src=/tmp/citadel_out/latencies_hdht.txt.1441308190967.tmp, > > > packetSize=65016, chunksPerPacket=126, bytesCurBlock=0 > > > 2015-09-03 12:23:20,546 [1/NewLines:NewlineFileInputOperator] ERROR > > > fs.AbstractFileInputOperator failureHandling - FS reader error > > > java.io.IOException: Filesystem closed > > > at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808) > > > at > > > > > > org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:830) > > > at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:896) > > > at java.io.DataInputStream.read(DataInputStream.java:149) > > > at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) > > > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) > > > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > > > at java.io.InputStreamReader.read(InputStreamReader.java:184) > > > at java.io.BufferedReader.fill(BufferedReader.java:161) > > > at java.io.BufferedReader.readLine(BufferedReader.java:324) > > > at java.io.BufferedReader.readLine(BufferedReader.java:389) > > > at > > > > > > com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:78) > > > at > > > > > > com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:22) > > > at > > > > > > com.datatorrent.lib.io.fs.AbstractFileInputOperator.emitTuples(AbstractFileInputOperator.java:653) > > > at com.datatorrent.stram.engine.InputNode.run(InputNode.java:115) > > > at > > > > > > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1363) > > > > > > > > > > > > > > > ________________________________________________________ > > > > > > The information contained in this e-mail is confidential and/or > > > proprietary to Capital One and/or its affiliates and may only be used > > > solely in performance of work or services for Capital One. The > > information > > > transmitted herewith is intended only for use by the individual or > entity > > > to which it is addressed. If the reader of this message is not the > > intended > > > recipient, you are hereby notified that any review, retransmission, > > > dissemination, distribution, copying or other use of, or taking of any > > > action in reliance upon this information is strictly prohibited. If you > > > have received this communication in error, please contact the sender > and > > > delete the material from your computer. > > > > > >
