[
https://issues.apache.org/jira/browse/TAJO-1015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14133119#comment-14133119
]
ASF GitHub Bot commented on TAJO-1015:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/124#discussion_r17517726
--- Diff:
tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---
@@ -0,0 +1,449 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.util.Pair;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+public class ExecutionBlockContext {
+ /** class logger */
+ private static final Log LOG =
LogFactory.getLog(ExecutionBlockContext.class);
+
+ private TaskRunnerManager manager;
+ public AtomicInteger completedTasksNum = new AtomicInteger();
+ public AtomicInteger succeededTasksNum = new AtomicInteger();
+ public AtomicInteger killedTasksNum = new AtomicInteger();
+ public AtomicInteger failedTasksNum = new AtomicInteger();
+
+ private ClientSocketChannelFactory channelFactory;
+ // for temporal or intermediate files
+ private FileSystem localFS;
+ // for input files
+ private FileSystem defaultFS;
+ private ExecutionBlockId executionBlockId;
+ private QueryContext queryContext;
+ private String plan;
+
+ private ExecutionBlockSharedResource resource;
+
+ private TajoQueryEngine queryEngine;
+ private RpcConnectionPool connPool;
+ private InetSocketAddress qmMasterAddr;
+ private TajoConf systemConf;
+ // for the doAs block
+ private UserGroupInformation taskOwner;
+
+ private Reporter reporter;
+
+ private AtomicBoolean stop = new AtomicBoolean();
+
+ // It keeps all of the query unit attempts while a TaskRunner is running.
+ private final ConcurrentMap<QueryUnitAttemptId, Task> tasks =
Maps.newConcurrentMap();
+
+ private final ConcurrentMap<String, TaskRunnerHistory> histories =
Maps.newConcurrentMap();
+
+ public ExecutionBlockContext(TaskRunnerManager manager,
TaskRunnerStartEvent event, InetSocketAddress queryMaster)
+ throws Throwable {
+ this.manager = manager;
+ this.executionBlockId = event.getExecutionBlockId();
+ this.connPool = RpcConnectionPool.getPool(manager.getTajoConf());
+ this.qmMasterAddr = queryMaster;
+ this.systemConf = manager.getTajoConf();
+ this.reporter = new Reporter();
+ this.defaultFS =
TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+ this.localFS = FileSystem.getLocal(systemConf);
+
+ // Setup QueryEngine according to the query plan
+ // Here, we can setup row-based query engine or columnar query engine.
+ this.queryEngine = new TajoQueryEngine(systemConf);
+ this.queryContext = event.getQueryContext();
+ this.plan = event.getPlan();
+ this.resource = new ExecutionBlockSharedResource();
+
+ init();
+ }
+
+ public void init() throws Throwable {
+
+ LOG.info("Tajo Root Dir: " +
systemConf.getVar(TajoConf.ConfVars.ROOT_DIR));
+ LOG.info("Worker Local Dir: " +
systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR));
+
+ LOG.info("QueryMaster Address:" + qmMasterAddr);
+
+ UserGroupInformation.setConfiguration(systemConf);
+ // TODO - 'load credential' should be implemented
+ // Getting taskOwner
+ UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME));
+ //taskOwner.addToken(token);
+
+ // initialize MasterWorkerProtocol as an actual task owner.
+// this.client =
--- End diff --
Could you remove the commented lines?
> Add executionblock event in worker
> ----------------------------------
>
> Key: TAJO-1015
> URL: https://issues.apache.org/jira/browse/TAJO-1015
> Project: Tajo
> Issue Type: Sub-task
> Components: worker
> Reporter: Jinho Kim
> Assignee: Jinho Kim
>
> * add ExecutionBlock start/stop event
> * add shareable context of ExecutionBlock
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)