Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/124#discussion_r17517726
  
    --- Diff: 
tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---
    @@ -0,0 +1,449 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.worker;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.util.ReflectionUtils;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.TajoProtos;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.engine.query.QueryContext;
    +import org.apache.tajo.ipc.QueryMasterProtocol;
    +import org.apache.tajo.rpc.NettyClientBase;
    +import org.apache.tajo.rpc.NullCallback;
    +import org.apache.tajo.rpc.RpcChannelFactory;
    +import org.apache.tajo.rpc.RpcConnectionPool;
    +import org.apache.tajo.storage.HashShuffleAppenderManager;
    +import org.apache.tajo.storage.StorageUtil;
    +import org.apache.tajo.util.Pair;
    +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
    +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
    +
    +public class ExecutionBlockContext {
    +  /** class logger */
    +  private static final Log LOG = 
LogFactory.getLog(ExecutionBlockContext.class);
    +
    +  private TaskRunnerManager manager;
    +  public AtomicInteger completedTasksNum = new AtomicInteger();
    +  public AtomicInteger succeededTasksNum = new AtomicInteger();
    +  public AtomicInteger killedTasksNum = new AtomicInteger();
    +  public AtomicInteger failedTasksNum = new AtomicInteger();
    +
    +  private ClientSocketChannelFactory channelFactory;
    +  // for temporal or intermediate files
    +  private FileSystem localFS;
    +  // for input files
    +  private FileSystem defaultFS;
    +  private ExecutionBlockId executionBlockId;
    +  private QueryContext queryContext;
    +  private String plan;
    +
    +  private ExecutionBlockSharedResource resource;
    +
    +  private TajoQueryEngine queryEngine;
    +  private RpcConnectionPool connPool;
    +  private InetSocketAddress qmMasterAddr;
    +  private TajoConf systemConf;
    +  // for the doAs block
    +  private UserGroupInformation taskOwner;
    +
    +  private Reporter reporter;
    +
    +  private AtomicBoolean stop = new AtomicBoolean();
    +
    +  // It keeps all of the query unit attempts while a TaskRunner is running.
    +  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = 
Maps.newConcurrentMap();
    +
    +  private final ConcurrentMap<String, TaskRunnerHistory> histories = 
Maps.newConcurrentMap();
    +
    +  public ExecutionBlockContext(TaskRunnerManager manager, 
TaskRunnerStartEvent event, InetSocketAddress queryMaster)
    +      throws Throwable {
    +    this.manager = manager;
    +    this.executionBlockId = event.getExecutionBlockId();
    +    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
    +    this.qmMasterAddr = queryMaster;
    +    this.systemConf = manager.getTajoConf();
    +    this.reporter = new Reporter();
    +    this.defaultFS = 
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
    +    this.localFS = FileSystem.getLocal(systemConf);
    +
    +    // Setup QueryEngine according to the query plan
    +    // Here, we can setup row-based query engine or columnar query engine.
    +    this.queryEngine = new TajoQueryEngine(systemConf);
    +    this.queryContext = event.getQueryContext();
    +    this.plan = event.getPlan();
    +    this.resource = new ExecutionBlockSharedResource();
    +
    +    init();
    +  }
    +
    +  public void init() throws Throwable {
    +
    +    LOG.info("Tajo Root Dir: " + 
systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
    +    LOG.info("Worker Local Dir: " + 
systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
    +
    +    LOG.info("QueryMaster Address:" + qmMasterAddr);
    +
    +    UserGroupInformation.setConfiguration(systemConf);
    +    // TODO - 'load credential' should be implemented
    +    // Getting taskOwner
    +    UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
    +    //taskOwner.addToken(token);
    +
    +    // initialize MasterWorkerProtocol as an actual task owner.
    +//      this.client =
    --- End diff --
    
    Could you remove the commented lines?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to