[
https://issues.apache.org/jira/browse/TAJO-908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14058310#comment-14058310
]
ASF GitHub Bot commented on TAJO-908:
-------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/58#discussion_r14805622
--- Diff: tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
---
@@ -96,29 +113,83 @@ public void testAdjustFetchProcess() {
@Test
public void testStatus() throws Exception {
Random rnd = new Random();
- FileWriter writer = new FileWriter(INPUT_DIR + "data");
- String data;
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" +
sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s",
queryId, sid, partId, "h", ta);
+
+ FSDataOutputStream stream = LocalFileSystem.get(conf).create(new
Path(dataPath), true);
for (int i = 0; i < 100; i++) {
- data = ""+rnd.nextInt();
- writer.write(data);
+ String data = ""+rnd.nextInt();
+ stream.write(data.getBytes());
}
- writer.flush();
- writer.close();
+ stream.flush();
+ stream.close();
+
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort()
+ "/?" + params);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR +
"data"), channelFactory);
+ assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+ fetcher.get();
+ assertEquals(TajoProtos.FetcherState.FETCH_FINISHED,
fetcher.getState());
+ }
- DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
- final HttpDataServer server = new HttpDataServer(
- NetUtils.createSocketAddr("127.0.0.1:0"), ret);
- server.start();
- InetSocketAddress addr = server.getBindAddress();
+ @Test
+ public void testEmptyFileTask() throws Exception {
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" +
sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s",
queryId, sid, partId, "h", ta);
+
+ Path inputPath = new Path(dataPath);
+ if(LocalFileSystem.get(conf).exists(inputPath)){
+ LocalFileSystem.get(conf).delete(new Path(dataPath), true);
+ }
- URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
- ClientSocketChannelFactory channelFactory =
RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+ FSDataOutputStream stream = LocalFileSystem.get(conf).create(new
Path(dataPath).getParent(), true);
+ stream.close();
- final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR +
"data"), channelFactory);
+ URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort()
+ "/?" + params);
+ final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR +
"data"), channelFactory);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
assertEquals(TajoProtos.FetcherState.FETCH_FINISHED,
fetcher.getState());
- server.stop();
+ }
+
+ @Test
+ public void testFailureStatus() throws Exception {
+ Random rnd = new Random();
+
+ QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+ String sid = "1";
+ String ta = "1_0";
+ String partId = "1";
+
+ String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" +
sid + "/" +ta + "/output/" + partId;
+ String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s",
queryId, sid, partId, "x", ta);
--- End diff --
The fetch failure will be caused by shuffle type character 'x' because
'h','r', and 's' are only allowed. But, it is hard for usual contributors to
know this fact. Could you add a brief comment about the fact?
> Fetcher does not retry, when pull server connection was closed
> --------------------------------------------------------------
>
> Key: TAJO-908
> URL: https://issues.apache.org/jira/browse/TAJO-908
> Project: Tajo
> Issue Type: Bug
> Components: data shuffle
> Affects Versions: 0.8.0, 0.9.0
> Reporter: Jinho Kim
> Assignee: Jinho Kim
>
> Exception message as follows:
> {code:title=PullServer}
> 2014-07-04 09:45:21,150 ERROR pullserver.TajoPullServerService
> (TajoPullServerService.java:exceptionCaught(533)) - PullServer error:
> java.io.IOException: Connection timed out
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> {code:title=Fetcher}
> org.jboss.netty.channel.SimpleChannelUpstreamHandler
> WARN: EXCEPTION, please implement
> org.apache.tajo.worker.Fetcher$HttpClientHandler.exceptionCaught() for proper
> handling.
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
> at
> org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
> at
> org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2014-07-04 10:56:44,561 INFO worker.Task (Task.java:waitForFetch(375)) -
> ta_1404438787481_0001_000004_000029_00 All fetches are done!
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)