[
https://issues.apache.org/jira/browse/TAJO-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15251444#comment-15251444
]
ASF GitHub Bot commented on TAJO-2122:
--------------------------------------
Github user jinossy commented on a diff in the pull request:
https://github.com/apache/tajo/pull/1001#discussion_r60535225
--- Diff: tajo-core/src/main/java/org/apache/tajo/worker/LocalFetcher.java
---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.tajo.worker;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TajoProtos.FetcherState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.pullserver.PullServerConstants;
+import org.apache.tajo.pullserver.PullServerConstants.Param;
+import org.apache.tajo.pullserver.PullServerUtil;
+import org.apache.tajo.pullserver.PullServerUtil.PullServerParams;
+import
org.apache.tajo.pullserver.PullServerUtil.PullServerRequestURIBuilder;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.pullserver.retriever.FileChunkMeta;
+import org.apache.tajo.rpc.NettyUtils;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class LocalFetcher extends AbstractFetcher {
+
+ private final static Log LOG = LogFactory.getLog(LocalFetcher.class);
+
+// private final ExecutionBlockContext executionBlockContext;
+ private final TajoPullServerService pullServerService;
+
+ private final String host;
+ private int port;
+ private final Bootstrap bootstrap;
+ private final int maxUrlLength;
+ private final List<FileChunkMeta> chunkMetas = new ArrayList<>();
+ private final String tableName;
+ private final FileSystem localFileSystem;
+ private final LocalDirAllocator localDirAllocator;
+
+ @VisibleForTesting
+ public LocalFetcher(TajoConf conf, URI uri, String tableName) throws
IOException {
+ super(conf, uri);
+ this.maxUrlLength =
conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ this.tableName = tableName;
+ this.localFileSystem = new LocalFileSystem();
+ this.localDirAllocator = new
LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+ this.pullServerService = null;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new Bootstrap()
+ .group(
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+
conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) *
1000)
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+ }
+
+ public LocalFetcher(TajoConf conf, URI uri, ExecutionBlockContext
executionBlockContext, String tableName) {
+ super(conf, uri);
+ this.localFileSystem = executionBlockContext.getLocalFS();
+ this.localDirAllocator = executionBlockContext.getLocalDirAllocator();
+ this.maxUrlLength =
conf.getIntVar(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH);
+ this.tableName = tableName;
+
+ Optional<TajoPullServerService> optional =
executionBlockContext.getSharedResource().getPullServerService();
+ if (optional.isPresent()) {
+ // local pull server service
+ this.pullServerService = optional.get();
+ this.host = null;
+ this.bootstrap = null;
+
+ } else if (PullServerUtil.useExternalPullServerService(conf)) {
+ // external pull server service
+ pullServerService = null;
+
+ String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
+ this.host = uri.getHost() == null ? "localhost" : uri.getHost();
+ this.port = uri.getPort();
+ if (port == -1) {
+ if (scheme.equalsIgnoreCase("http")) {
+ this.port = 80;
+ } else if (scheme.equalsIgnoreCase("https")) {
+ this.port = 443;
+ }
+ }
+
+ bootstrap = new Bootstrap()
+ .group(
+ NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
+
conf.getIntVar(ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) *
1000)
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+ } else {
+ endFetch(FetcherState.FETCH_FAILED);
+ throw new TajoInternalError("Pull server service is not
initialized");
+ }
+ }
+
+ @Override
+ public List<FileChunk> get() throws IOException {
+ return pullServerService != null ? getDirect() : getFromFetchURI();
--- End diff --
Why local fetcher connect to remote server?
> PullServer as an Auxiliary service of Yarn
> ------------------------------------------
>
> Key: TAJO-2122
> URL: https://issues.apache.org/jira/browse/TAJO-2122
> Project: Tajo
> Issue Type: New Feature
> Components: Pull Server
> Reporter: Jihoon Son
> Assignee: Jihoon Son
> Fix For: 0.12.0
>
>
> We are going to support Yarn as Tajo's one of resource schedulers. To do so,
> the PullServer should be capable of executing as an Auxiliary service.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)