[ 
https://issues.apache.org/jira/browse/TAJO-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251551#comment-15251551
 ] 

ASF GitHub Bot commented on TAJO-2122:
--------------------------------------

Github user jinossy commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/1001#discussion_r60542686
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java 
---
    @@ -0,0 +1,451 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +  package org.apache.tajo.worker;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.gson.Gson;
    +import io.netty.bootstrap.Bootstrap;
    +import io.netty.buffer.ByteBuf;
    +import io.netty.channel.*;
    +import io.netty.channel.socket.nio.NioSocketChannel;
    +import io.netty.handler.codec.http.*;
    +import io.netty.handler.timeout.ReadTimeoutException;
    +import io.netty.handler.timeout.ReadTimeoutHandler;
    +import io.netty.util.ReferenceCountUtil;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.LocalFileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.tajo.TajoProtos;
    +import org.apache.tajo.TajoProtos.FetcherState;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.conf.TajoConf.ConfVars;
    +import org.apache.tajo.exception.TajoInternalError;
    +import org.apache.tajo.pullserver.PullServerConstants;
    +import org.apache.tajo.pullserver.PullServerConstants.Param;
    +import org.apache.tajo.pullserver.PullServerUtil;
    +import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
    +import 
org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
    +import org.apache.tajo.pullserver.TajoPullServerService;
    +import org.apache.tajo.pullserver.retriever.FileChunk;
    +import org.apache.tajo.pullserver.retriever.FileChunkMeta;
    +import org.apache.tajo.rpc.NettyUtils;
    +import org.apache.tajo.storage.HashShuffleAppenderManager;
    +import org.apache.tajo.storage.StorageUtil;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.net.URI;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Optional;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +public class LocalFetcher extends AbstractFetcher {
    +
    +  private final static Log LOG = LogFactory.getLog(LocalFetcher.class);
    +
    +//  private final ExecutionBlockContext executionBlockContext;
    +  private final TajoPullServerService pullServerService;
    +
    +  private final String host;
    +  private int port;
    +  private final Bootstrap bootstrap;
    +  private final int maxUrlLength;
    +  private final List<FileChunkMeta> chunkMetas = new ArrayList<>();
    +  private final String tableName;
    +  private final FileSystem localFileSystem;
    +  private final LocalDirAllocator localDirAllocator;
    +
    +  @VisibleForTesting
    +  public LocalFetcher(TajoConf conf, URI uri, String tableName) throws 
IOException {
    +    super(conf, uri);
    +    this.maxUrlLength = 
conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
    +    this.tableName = tableName;
    +    this.localFileSystem = new LocalFileSystem();
    +    this.localDirAllocator = new 
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
    +    this.pullServerService = null;
    +
    +    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
    +    this.host = uri.getHost() == null ? "localhost" : uri.getHost();
    +    this.port = uri.getPort();
    +    if (port == -1) {
    +      if (scheme.equalsIgnoreCase("http")) {
    +        this.port = 80;
    +      } else if (scheme.equalsIgnoreCase("https")) {
    +        this.port = 443;
    +      }
    +    }
    +
    +    bootstrap = new Bootstrap()
    +        .group(
    +            NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
    +                
conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
    +        .channel(NioSocketChannel.class)
    +        .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
    +        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
    +            conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 
1000)
    +        .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
    +        .option(ChannelOption.TCP_NODELAY, true);
    +  }
    +
    +  public LocalFetcher(TajoConf conf, URI uri, ExecutionBlockContext 
executionBlockContext, String tableName) {
    +    super(conf, uri);
    +    this.localFileSystem = executionBlockContext.getLocalFS();
    +    this.localDirAllocator = executionBlockContext.getLocalDirAllocator();
    +    this.maxUrlLength = 
conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
    +    this.tableName = tableName;
    +
    +    Optional<TajoPullServerService> optional = 
executionBlockContext.getSharedResource().getPullServerService();
    +    if (optional.isPresent()) {
    +      // local pull server service
    +      this.pullServerService = optional.get();
    +      this.host = null;
    +      this.bootstrap = null;
    +
    +    } else if (PullServerUtil.useExternalPullServerService(conf)) {
    +      // external pull server service
    +      pullServerService = null;
    +
    +      String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
    +      this.host = uri.getHost() == null ? "localhost" : uri.getHost();
    +      this.port = uri.getPort();
    +      if (port == -1) {
    +        if (scheme.equalsIgnoreCase("http")) {
    +          this.port = 80;
    +        } else if (scheme.equalsIgnoreCase("https")) {
    +          this.port = 443;
    +        }
    +      }
    +
    +      bootstrap = new Bootstrap()
    +          .group(
    +              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
    +                  
conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
    +          .channel(NioSocketChannel.class)
    +          .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
    +          .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
    +              conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 
1000)
    +          .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
    +          .option(ChannelOption.TCP_NODELAY, true);
    +    } else {
    +      endFetch(FetcherState.FETCH_FAILED);
    +      throw new TajoInternalError("Pull server service is not 
initialized");
    +    }
    +  }
    +
    +  @Override
    +  public List<FileChunk> get() throws IOException {
    +    return pullServerService != null ? getDirect() : getFromFetchURI();
    --- End diff --
    
    OK, thanks


> PullServer as an Auxiliary service of Yarn
> ------------------------------------------
>
>                 Key: TAJO-2122
>                 URL: https://issues.apache.org/jira/browse/TAJO-2122
>             Project: Tajo
>          Issue Type: New Feature
>          Components: Pull Server
>            Reporter: Jihoon Son
>            Assignee: Jihoon Son
>             Fix For: 0.12.0
>
>
> We are going to support Yarn as Tajo's one of resource schedulers. To do so, 
> the PullServer should be capable of executing as an Auxiliary service.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to