Ilya,

In your code there is a hard stop after 15s. There are other options to do
this:

   - The application itself exits when it is done, that is if the input
   operator raises the ShutdownException which leads to graceful termination.
   - In your test code, let the cluster run asynchronously, check for
   existence of the expected output (file for example) and call shutdown when
   the file is complete.

Thomas

On Thu, Sep 3, 2015 at 12:36 PM, Ganelin, Ilya <[email protected]>
wrote:

> Hello all – I’m using the following code to execute a topology locally:
>
> try {
>     LocalMode lma = LocalMode.newInstance();
>     Configuration conf = new Configuration(false);
>
> conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
>     lma.prepareDAG(new Application(), conf);
>     LocalMode.Controller lc = lma.getController();
>     lc.run(15000); // runs for 10 seconds and quits
>     lc.shutdown();
> } catch (ConstraintViolationException e) {
>     Assert.fail("constraint violations: " + e.getConstraintViolations());
> }
>
> I am reading from HDFS, operating on the data, and writing it back to
> HDFS. The HDFS operator implementations extend AbstractfileInputOperator
> and AbstractfileOutputOperator respectively.
>
> My issue is that when the timer ends, my operators fail catastrophically,
> without completing their current operations (e.g. if reading/writing from
> HDFS they don’t close the file stream). Is there a way for this to close
> gracefully? Is this an issue with the implementations of the operators or
> with the way that local topologies are executed?
>
> Error stacks below:
>
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.FutureTask@3848436c rejected from
> java.util.concurrent.ThreadPoolExecutor@770deac6[Terminated, pool size =
> 0, active threads = 0, queued tasks = 0, completed tasks = 20121]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
> at
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
> at
> java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
> at
> java.util.concurrent.Executors$DelegatedExecutorService.submit(Executors.java:678)
> at com.datatorrent.bufferserver.internal.DataList.flush(DataList.java:226)
> at
> com.datatorrent.bufferserver.server.Server$Publisher.read(Server.java:642)
> at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:117)
> at
> com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:295)
> at
> com.datatorrent.netlet.DefaultEventLoop.runEventLoop(DefaultEventLoop.java:252)
> at com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:100)
> at java.lang.Thread.run(Thread.java:745)
> 2015-09-03 12:23:20,545 [2/RecordMaker I:RecordMaker] DEBUG
> engine.StreamingContainer teardownNode - deactivated 2
> 2015-09-03 12:23:20,544 [IPC Parameter Sending Thread #0] DEBUG ipc.Client
> run - IPC Client (467473545) connection to
> mdcilabpen01.kdc.capitalone.com/10.24.28.46:8020 from zjb238 sending #14
> 2015-09-03 12:23:20,543 [9/HdfsOutHdht:HdfsFileOutputOperator] DEBUG
> hdfs.DFSClient writeChunkImpl - DFSClient writeChunk allocating new packet
> seqno=0, src=/tmp/citadel_out/latencies_hdht.txt.1441308190967.tmp,
> packetSize=65016, chunksPerPacket=126, bytesCurBlock=0
> 2015-09-03 12:23:20,546 [1/NewLines:NewlineFileInputOperator] ERROR
> fs.AbstractFileInputOperator failureHandling - FS reader error
> java.io.IOException: Filesystem closed
> at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:808)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:830)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:896)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
> at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
> at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
> at java.io.InputStreamReader.read(InputStreamReader.java:184)
> at java.io.BufferedReader.fill(BufferedReader.java:161)
> at java.io.BufferedReader.readLine(BufferedReader.java:324)
> at java.io.BufferedReader.readLine(BufferedReader.java:389)
> at
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:78)
> at
> com.capitalone.vault8.citadel.operators.impl.NewlineFileInputOperator.readEntity(NewlineFileInputOperator.java:22)
> at
> com.datatorrent.lib.io.fs.AbstractFileInputOperator.emitTuples(AbstractFileInputOperator.java:653)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:115)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1363)
>
>
>
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to