Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/978#discussion_r148095543 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import io.netty.buffer.DrillBuf; + +/** + * Implementation of {@link OperatorExecContext} that provides services + * needed by most run-time operators. Excludes services that need the + * entire Drillbit. This class provides services common to the test-time + * version of the operator context and the full production-time context + * that includes network services. + */ + +public abstract class BaseOperatorContext implements OperatorContext { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class); + + protected final FragmentContextInterface context; + protected final BufferAllocator allocator; + protected final PhysicalOperator popConfig; + protected final BufferManager manager; + protected OperatorStatReceiver statsWriter; + private DrillFileSystem fs; + private ControlsInjector injector; + + public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator, + PhysicalOperator popConfig, + OperatorStatReceiver stats) { + this.context = context; + this.allocator = allocator; + this.popConfig = popConfig; + this.manager = new BufferManagerImpl(allocator); + statsWriter = stats; + } + + @Override + public FragmentContextInterface getFragmentContext() { + return context; + } + + @SuppressWarnings("unchecked") + @Override + public <T extends PhysicalOperator> T getOperatorDefn() { + return (T) popConfig; + } + + public String getName() { + return popConfig.getClass().getName(); + } + + @Override + public DrillBuf replace(DrillBuf old, int newSize) { + return manager.replace(old, newSize); + } + + @Override + public DrillBuf getManagedBuffer() { + return manager.getManagedBuffer(); + } + + @Override + public DrillBuf getManagedBuffer(int size) { + return manager.getManagedBuffer(size); + } + + @Override + public ExecutionControls getExecutionControls() { + return context.getExecutionControls(); + } + + @Override + public BufferAllocator getAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException("Operator context does not have an allocator"); + } + return allocator; + } + + // Allow an operator to use the thread pool + @Override + public ExecutorService getExecutor() { + return context.getDrillbitContext().getExecutor(); + } + + @Override + public ExecutorService getScanExecutor() { + return context.getDrillbitContext().getScanExecutor(); + } + + @Override + public ExecutorService getScanDecodeExecutor() { + return context.getDrillbitContext().getScanDecodeExecutor(); + } + + @Override + public void setInjector(ControlsInjector injector) { + this.injector = injector; + } + + @Override + public ControlsInjector getInjector() { + return injector; + } + + @Override + public void injectUnchecked(String desc) { + ExecutionControls executionControls = context.getExecutionControls(); + if (injector != null && executionControls != null) { + injector.injectUnchecked(executionControls, desc); + } + } + + @Override + public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass) + throws T { + ExecutionControls executionControls = context.getExecutionControls(); + if (injector != null && executionControls != null) { + injector.injectChecked(executionControls, desc, exceptionClass); + } + } + + @Override + public void close() { + RuntimeException ex = null; + try { + manager.close(); + } catch (RuntimeException e) { + ex = e; + } + try { + if (allocator != null) { + allocator.close(); + } + } catch (RuntimeException e) { + ex = ex == null ? e : ex; + } + try { + if (fs != null) { + fs.close(); + fs = null; + } + } catch (IOException e) { + throw UserException.resourceError(e) + .addContext("Failed to close the Drill file system for " + getName()) --- End diff -- how about adding context for the `exception ex` as well if it's not null
---