[ 
https://issues.apache.org/jira/browse/TAJO-1015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14133119#comment-14133119
 ] 

ASF GitHub Bot commented on TAJO-1015:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/124#discussion_r17517726
  
    --- Diff: 
tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---
    @@ -0,0 +1,449 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.worker;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocalDirAllocator;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.util.ReflectionUtils;
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.QueryUnitAttemptId;
    +import org.apache.tajo.TajoProtos;
    +import org.apache.tajo.conf.TajoConf;
    +import org.apache.tajo.engine.query.QueryContext;
    +import org.apache.tajo.ipc.QueryMasterProtocol;
    +import org.apache.tajo.rpc.NettyClientBase;
    +import org.apache.tajo.rpc.NullCallback;
    +import org.apache.tajo.rpc.RpcChannelFactory;
    +import org.apache.tajo.rpc.RpcConnectionPool;
    +import org.apache.tajo.storage.HashShuffleAppenderManager;
    +import org.apache.tajo.storage.StorageUtil;
    +import org.apache.tajo.util.Pair;
    +import org.apache.tajo.worker.event.TaskRunnerStartEvent;
    +import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
    +
    +import java.io.IOException;
    +import java.net.InetSocketAddress;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.ConcurrentMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
    +
    +public class ExecutionBlockContext {
    +  /** class logger */
    +  private static final Log LOG = 
LogFactory.getLog(ExecutionBlockContext.class);
    +
    +  private TaskRunnerManager manager;
    +  public AtomicInteger completedTasksNum = new AtomicInteger();
    +  public AtomicInteger succeededTasksNum = new AtomicInteger();
    +  public AtomicInteger killedTasksNum = new AtomicInteger();
    +  public AtomicInteger failedTasksNum = new AtomicInteger();
    +
    +  private ClientSocketChannelFactory channelFactory;
    +  // for temporal or intermediate files
    +  private FileSystem localFS;
    +  // for input files
    +  private FileSystem defaultFS;
    +  private ExecutionBlockId executionBlockId;
    +  private QueryContext queryContext;
    +  private String plan;
    +
    +  private ExecutionBlockSharedResource resource;
    +
    +  private TajoQueryEngine queryEngine;
    +  private RpcConnectionPool connPool;
    +  private InetSocketAddress qmMasterAddr;
    +  private TajoConf systemConf;
    +  // for the doAs block
    +  private UserGroupInformation taskOwner;
    +
    +  private Reporter reporter;
    +
    +  private AtomicBoolean stop = new AtomicBoolean();
    +
    +  // It keeps all of the query unit attempts while a TaskRunner is running.
    +  private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = 
Maps.newConcurrentMap();
    +
    +  private final ConcurrentMap<String, TaskRunnerHistory> histories = 
Maps.newConcurrentMap();
    +
    +  public ExecutionBlockContext(TaskRunnerManager manager, 
TaskRunnerStartEvent event, InetSocketAddress queryMaster)
    +      throws Throwable {
    +    this.manager = manager;
    +    this.executionBlockId = event.getExecutionBlockId();
    +    this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
    +    this.qmMasterAddr = queryMaster;
    +    this.systemConf = manager.getTajoConf();
    +    this.reporter = new Reporter();
    +    this.defaultFS = 
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
    +    this.localFS = FileSystem.getLocal(systemConf);
    +
    +    // Setup QueryEngine according to the query plan
    +    // Here, we can setup row-based query engine or columnar query engine.
    +    this.queryEngine = new TajoQueryEngine(systemConf);
    +    this.queryContext = event.getQueryContext();
    +    this.plan = event.getPlan();
    +    this.resource = new ExecutionBlockSharedResource();
    +
    +    init();
    +  }
    +
    +  public void init() throws Throwable {
    +
    +    LOG.info("Tajo Root Dir: " + 
systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
    +    LOG.info("Worker Local Dir: " + 
systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
    +
    +    LOG.info("QueryMaster Address:" + qmMasterAddr);
    +
    +    UserGroupInformation.setConfiguration(systemConf);
    +    // TODO - 'load credential' should be implemented
    +    // Getting taskOwner
    +    UserGroupInformation taskOwner = 
UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
    +    //taskOwner.addToken(token);
    +
    +    // initialize MasterWorkerProtocol as an actual task owner.
    +//      this.client =
    --- End diff --
    
    Could you remove the commented lines?


> Add executionblock event in worker
> ----------------------------------
>
>                 Key: TAJO-1015
>                 URL: https://issues.apache.org/jira/browse/TAJO-1015
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: worker
>            Reporter: Jinho Kim
>            Assignee: Jinho Kim
>
> * add ExecutionBlock start/stop event
> * add shareable context of ExecutionBlock



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to