http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java 
b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
new file mode 100644
index 0000000..fb70f89
--- /dev/null
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+
+import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.util.AssertionUtil;
+
+public final class DrillBuf extends AbstractByteBuf {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
+
+  private final ByteBuf b;
+  private final long addr;
+  private final int offset;
+  private final boolean rootBuffer;
+
+  private volatile BufferAllocator allocator;
+  private volatile Accountor acct;
+  private volatile int length;
+  private final boolean emptyBuffer;
+  private OperatorContext context;
+  private FragmentContext fContext;
+
+
+  public DrillBuf(BufferAllocator allocator, Accountor a, 
UnsafeDirectLittleEndian b) {
+    super(b.maxCapacity());
+    this.b = b;
+    this.addr = b.memoryAddress();
+    this.acct = a;
+    this.length = b.capacity();
+    this.offset = 0;
+    this.rootBuffer = true;
+    this.allocator = allocator;
+    this.emptyBuffer = false;
+  }
+
+  private DrillBuf(ByteBuffer bb){
+    super(bb.remaining());
+    UnpooledUnsafeDirectByteBuf bytebuf = new 
UnpooledUnsafeDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, bb, 
bb.remaining());
+    this.acct = FakeAllocator.FAKE_ACCOUNTOR;
+    this.addr = bytebuf.memoryAddress();
+    this.allocator = FakeAllocator.FAKE_ALLOCATOR;
+    this.b = bytebuf;
+    this.emptyBuffer = false;
+    this.length = bytebuf.capacity();
+    this.offset = 0;
+    this.rootBuffer = true;
+    this.writerIndex(bb.remaining());
+  }
+
+  private DrillBuf(BufferAllocator allocator, Accountor a){
+    super(0);
+    this.b = new 
EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
+    this.allocator = allocator;
+    this.acct = a;
+    this.length = 0;
+    this.addr = 0;
+    this.rootBuffer = true;
+    this.offset = 0;
+    this.emptyBuffer = true;
+  }
+
+  private DrillBuf(DrillBuf buffer, int index, int length) {
+    super(length);
+    if (index < 0 || index > buffer.capacity() - length) {
+      throw new IndexOutOfBoundsException(buffer.toString() + ".slice(" + 
index + ", " + length + ')');
+    }
+    this.emptyBuffer = false;
+    this.length = length;
+    writerIndex(length);
+
+    this.b = buffer;
+    this.addr = buffer.memoryAddress() + index;
+    this.offset = index;
+    this.acct = null;
+    this.length = length;
+    this.rootBuffer = false;
+    this.allocator = buffer.allocator;
+  }
+
+  public void setOperatorContext(OperatorContext c){
+    this.context = c;
+  }
+  public void setFragmentContext(FragmentContext c){
+    this.fContext = c;
+  }
+
+  public BufferAllocator getAllocator(){
+    return allocator;
+  }
+
+  public DrillBuf reallocIfNeeded(int size){
+    if(this.capacity() >= size) return this;
+    if(context != null){
+      return context.replace(this, size);
+    }else if(fContext != null){
+      return fContext.replace(this, size);
+    }else{
+      throw new UnsupportedOperationException("Realloc is only available in 
the context of an operator's UDFs");
+    }
+
+  }
+
+  @Override
+  public int refCnt() {
+    return b.refCnt();
+  }
+
+  private long addr(int index) {
+    return addr + index;
+  }
+
+
+
+  private final void checkIndexD(int index) {
+      ensureAccessible();
+      if (index < 0 || index >= capacity()) {
+          throw new IndexOutOfBoundsException(String.format(
+                  "index: %d (expected: range(0, %d))", index, capacity()));
+      }
+  }
+
+  private final void checkIndexD(int index, int fieldLength) {
+      ensureAccessible();
+      if (fieldLength < 0) {
+          throw new IllegalArgumentException("length: " + fieldLength + " 
(expected: >= 0)");
+      }
+      if (index < 0 || index > capacity() - fieldLength) {
+          throw new IndexOutOfBoundsException(String.format(
+                  "index: %d, length: %d (expected: range(0, %d))", index, 
fieldLength, capacity()));
+      }
+  }
+
+  private void chk(int index, int width) {
+    if (AssertionUtil.isAssertionsEnabled()) {
+      checkIndexD(offset+index, width);
+    }
+  }
+
+  private void chk(int index) {
+    if (AssertionUtil.isAssertionsEnabled()) {
+      checkIndexD(index);
+    }
+  }
+
+  private void ensure(int width) {
+    if (AssertionUtil.isAssertionsEnabled()) {
+      ensureWritable(width);
+    }
+  }
+
+  public boolean transferAccounting(Accountor target) {
+    if (rootBuffer) {
+      boolean outcome = acct.transferTo(target, this, length);
+      acct = target;
+      return outcome;
+    } else {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public synchronized boolean release() {
+    if (b.release() && rootBuffer) {
+      acct.release(this, length);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public synchronized boolean release(int decrement) {
+    if (b.release(decrement) && rootBuffer) {
+      acct.release(this, length);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int capacity() {
+    return length;
+  }
+
+
+  @Override
+  public synchronized ByteBuf capacity(int newCapacity) {
+    if (rootBuffer) {
+      if (newCapacity == length) {
+        return this;
+      } else if (newCapacity < length) {
+        b.capacity(newCapacity);
+        int diff = length - b.capacity();
+        acct.releasePartial(this, diff);
+        this.length = length - diff;
+        return this;
+      } else {
+        throw new UnsupportedOperationException("Accounting byte buf doesn't 
support increasing allocations.");
+      }
+    } else {
+      throw new UnsupportedOperationException("Non root bufs doen't support 
changing allocations.");
+    }
+  }
+
+  @Override
+  public int maxCapacity() {
+    return length;
+  }
+
+  @Override
+  public ByteBufAllocator alloc() {
+    return b.alloc();
+  }
+
+  @Override
+  public ByteOrder order() {
+    return ByteOrder.LITTLE_ENDIAN;
+  }
+
+  @Override
+  public ByteBuf order(ByteOrder endianness) {
+    // if(endianness != ByteOrder.LITTLE_ENDIAN) throw new
+    // UnsupportedOperationException("Drill buffers only support little 
endian.");
+    return this;
+  }
+
+  @Override
+  public ByteBuf unwrap() {
+    return b;
+  }
+
+  @Override
+  public boolean isDirect() {
+    return true;
+  }
+
+  @Override
+  public ByteBuf readBytes(int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf readSlice(int length) {
+    ByteBuf slice = slice(readerIndex(), length);
+    readerIndex(readerIndex() + length);
+    return slice;
+  }
+
+  @Override
+  public ByteBuf copy() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf copy(int index, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBuf slice() {
+    return slice(readerIndex(), readableBytes());
+  }
+
+  @Override
+  public DrillBuf slice(int index, int length) {
+    DrillBuf buf = new DrillBuf(this, index, length);
+    buf.writerIndex = length;
+    return buf;
+  }
+
+  @Override
+  public DrillBuf duplicate() {
+    return new DrillBuf(this, 0, length);
+  }
+
+  @Override
+  public int nioBufferCount() {
+    return 1;
+  }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return nioBuffer(readerIndex(), readableBytes());
+  }
+
+  @Override
+  public ByteBuffer nioBuffer(int index, int length) {
+    return b.nioBuffer(offset + index, length);
+  }
+
+  @Override
+  public ByteBuffer internalNioBuffer(int index, int length) {
+    return b.internalNioBuffer(offset + index, length);
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers() {
+    return new ByteBuffer[]{nioBuffer()};
+  }
+
+  @Override
+  public ByteBuffer[] nioBuffers(int index, int length) {
+    return new ByteBuffer[]{nioBuffer(index, length)};
+  }
+
+  @Override
+  public boolean hasArray() {
+    return b.hasArray();
+  }
+
+  @Override
+  public byte[] array() {
+    return b.array();
+  }
+
+  @Override
+  public int arrayOffset() {
+    return b.arrayOffset();
+  }
+
+  @Override
+  public boolean hasMemoryAddress() {
+    return true;
+  }
+
+  @Override
+  public long memoryAddress() {
+    return this.addr;
+  }
+
+  @Override
+  public String toString(Charset charset) {
+      return toString(readerIndex, readableBytes(), charset);
+  }
+
+  @Override
+  public String toString(int index, int length, Charset charset) {
+      if (length == 0) {
+          return "";
+      }
+
+      ByteBuffer nioBuffer;
+      if (nioBufferCount() == 1) {
+          nioBuffer = nioBuffer(index, length);
+      } else {
+          nioBuffer = ByteBuffer.allocate(length);
+          getBytes(index, nioBuffer);
+          nioBuffer.flip();
+      }
+
+      return ByteBufUtil.decodeString(nioBuffer, charset);
+  }
+
+  @Override
+  public int hashCode() {
+    return System.identityHashCode(this);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // identity equals only.
+    return this == obj;
+  }
+
+  @Override
+  public ByteBuf retain(int increment) {
+    b.retain(increment);
+    return this;
+  }
+
+  @Override
+  public ByteBuf retain() {
+    b.retain();
+    return this;
+  }
+
+  @Override
+  public long getLong(int index) {
+    chk(index, 8);
+    long v = PlatformDependent.getLong(addr(index));
+    return v;
+  }
+
+  @Override
+  public float getFloat(int index) {
+    return Float.intBitsToFloat(getInt(index));
+  }
+
+  @Override
+  public double getDouble(int index) {
+    return Double.longBitsToDouble(getLong(index));
+  }
+
+  @Override
+  public char getChar(int index) {
+    return (char) getShort(index);
+  }
+
+  @Override
+  public long getUnsignedInt(int index) {
+    return getInt(index) & 0xFFFFFFFFL;
+  }
+
+  @Override
+  public int getInt(int index) {
+    chk(index, 4);
+    int v = PlatformDependent.getInt(addr(index));
+    return v;
+  }
+
+  @Override
+  public int getUnsignedShort(int index) {
+    return getShort(index) & 0xFFFF;
+  }
+
+  @Override
+  public short getShort(int index) {
+    chk(index, 2);
+    short v = PlatformDependent.getShort(addr(index));
+    return v;
+  }
+
+  @Override
+  public ByteBuf setShort(int index, int value) {
+    chk(index, 2);
+    PlatformDependent.putShort(addr(index), (short) value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setInt(int index, int value) {
+    chk(index, 4);
+    PlatformDependent.putInt(addr(index), value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setLong(int index, long value) {
+    chk(index, 8);
+    PlatformDependent.putLong(addr(index), value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setChar(int index, int value) {
+    chk(index, 2);
+    PlatformDependent.putShort(addr(index), (short) value);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setFloat(int index, float value) {
+    chk(index, 4);
+    PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf setDouble(int index, double value) {
+    chk(index, 8);
+    PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value));
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeShort(int value) {
+    ensure(2);
+    PlatformDependent.putShort(addr(writerIndex), (short) value);
+    writerIndex += 2;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeInt(int value) {
+    ensure(4);
+    PlatformDependent.putInt(addr(writerIndex), value);
+    writerIndex += 4;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeLong(long value) {
+    ensure(8);
+    PlatformDependent.putLong(addr(writerIndex), value);
+    writerIndex += 8;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeChar(int value) {
+    ensure(2);
+    PlatformDependent.putShort(addr(writerIndex), (short) value);
+    writerIndex += 2;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeFloat(float value) {
+    ensure(4);
+    PlatformDependent.putInt(addr(writerIndex), 
Float.floatToRawIntBits(value));
+    writerIndex += 4;
+    return this;
+  }
+
+  @Override
+  public ByteBuf writeDouble(double value) {
+    ensure(8);
+    PlatformDependent.putLong(addr(writerIndex), 
Double.doubleToRawLongBits(value));
+    writerIndex += 8;
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+    b.getBytes(index + offset,  dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuffer dst) {
+    b.getBytes(index + offset, dst);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setByte(int index, int value) {
+    PlatformDependent.putByte(addr(index), (byte) value);
+    return this;
+  }
+
+  @Override
+  protected byte _getByte(int index) {
+    return getByte(index);
+  }
+
+  @Override
+  protected short _getShort(int index) {
+    return getShort(index);
+  }
+
+  @Override
+  protected int _getInt(int index) {
+    return getInt(index);
+  }
+
+  @Override
+  protected long _getLong(int index) {
+    return getLong(index);
+  }
+
+  @Override
+  protected void _setByte(int index, int value) {
+    setByte(index, value);
+  }
+
+  @Override
+  protected void _setShort(int index, int value) {
+    setShort(index, value);
+  }
+
+  @Override
+  protected void _setMedium(int index, int value) {
+    setMedium(index, value);
+  }
+
+  @Override
+  protected void _setInt(int index, int value) {
+    setInt(index, value);
+  }
+
+  @Override
+  protected void _setLong(int index, long value) {
+    setLong(index, value);
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+    b.getBytes(index + offset, dst, dstIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf getBytes(int index, OutputStream out, int length) throws 
IOException {
+    b.getBytes(index + offset, out, length);
+    return this;
+  }
+
+  @Override
+  protected int _getUnsignedMedium(int index) {
+      long addr = addr(index);
+      return (PlatformDependent.getByte(addr) & 0xff) << 16 |
+              (PlatformDependent.getByte(addr + 1) & 0xff) << 8 |
+              PlatformDependent.getByte(addr + 2) & 0xff;
+  }
+
+  @Override
+  public int getBytes(int index, GatheringByteChannel out, int length) throws 
IOException {
+    return b.getBytes(index + offset, out, length);
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+    b.setBytes(index + offset, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+    b.setBytes(index + offset, src, srcIndex, length);
+    return this;
+  }
+
+  @Override
+  public ByteBuf setBytes(int index, ByteBuffer src) {
+    b.setBytes(index + offset, src);
+    return this;
+  }
+
+  @Override
+  public int setBytes(int index, InputStream in, int length) throws 
IOException {
+    return b.setBytes(index + offset, in, length);
+  }
+
+  @Override
+  public int setBytes(int index, ScatteringByteChannel in, int length) throws 
IOException {
+    return b.setBytes(index + offset, in, length);
+  }
+
+  @Override
+  public byte getByte(int index) {
+    return PlatformDependent.getByte(addr(index));
+  }
+
+  public static DrillBuf getEmpty(BufferAllocator allocator, Accountor a){
+    return new DrillBuf(allocator, a);
+  }
+
+  public static DrillBuf wrapByteBuffer(ByteBuffer b){
+    if(!b.isDirect()){
+      throw new IllegalStateException("DrillBufs can only refer to direct 
memory.");
+    }else{
+      return new DrillBuf(b);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java 
b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
new file mode 100644
index 0000000..bc69577
--- /dev/null
+++ b/exec/java-exec/src/main/java/io/netty/buffer/FakeAllocator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import org.apache.drill.exec.memory.Accountor;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+class FakeAllocator implements BufferAllocator {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FakeAllocator.class);
+
+
+  public static final Accountor FAKE_ACCOUNTOR = new FakeAccountor();
+  public static final BufferAllocator FAKE_ALLOCATOR = new FakeAllocator();
+
+  @Override
+  public DrillBuf buffer(int size) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DrillBuf buffer(int minSize, int maxSize) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ByteBufAllocator getUnderlyingAllocator() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BufferAllocator getChildAllocator(FragmentHandle handle, long 
initialReservation, long maximumReservation)
+      throws OutOfMemoryException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public DrillBuf getEmpty() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean takeOwnership(DrillBuf buf) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PreAllocator getNewPreAllocator() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public long getAllocatedMemory() {
+    return 0;
+  }
+
+  static class FakeAccountor extends Accountor {
+
+    public FakeAccountor() {
+      super(false, null, null, 0, 0);
+    }
+
+    @Override
+    public boolean transferTo(Accountor target, DrillBuf buf, long size) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAvailable() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getCapacity() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long getAllocation() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean reserve(long size) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean forceAdditionalReservation(long size) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void reserved(long expected, DrillBuf buf) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void releasePartial(DrillBuf buf, long size) {
+
+    }
+
+    @Override
+    public void release(DrillBuf buf, long size) {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java 
b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index a8482ae..c0de544 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -58,7 +58,7 @@ public class PooledByteBufAllocatorL extends 
PooledByteBufAllocator{
 
 
   @Override
-  public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+  public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int 
maxCapacity) {
       if (initialCapacity == 0 && maxCapacity == 0) {
           newDirectBuffer(initialCapacity, maxCapacity);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java 
b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index ad85a63..86d3b59 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -22,7 +22,9 @@ import io.netty.util.internal.PlatformDependent;
 
 import java.nio.ByteOrder;
 
-final class UnsafeDirectLittleEndian extends WrappedByteBuf {
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
     private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
     private final PooledUnsafeDirectByteBuf wrapped;
     private final long memoryAddress;
@@ -40,7 +42,7 @@ final class UnsafeDirectLittleEndian extends WrappedByteBuf {
 
     @Override
     public long getLong(int index) {
-        wrapped.checkIndex(index, 8);
+//        wrapped.checkIndex(index, 8);
         long v = PlatformDependent.getLong(addr(index));
         return v;
     }
@@ -67,7 +69,7 @@ final class UnsafeDirectLittleEndian extends WrappedByteBuf {
 
     @Override
     public int getInt(int index) {
-        wrapped.checkIndex(index, 4);
+//        wrapped.checkIndex(index, 4);
         int v = PlatformDependent.getInt(addr(index));
         return v;
     }
@@ -79,7 +81,7 @@ final class UnsafeDirectLittleEndian extends WrappedByteBuf {
 
     @Override
     public short getShort(int index) {
-        wrapped.checkIndex(index, 2);
+//        wrapped.checkIndex(index, 2);
         short v = PlatformDependent.getShort(addr(index));
         return v;
     }
@@ -176,4 +178,18 @@ final class UnsafeDirectLittleEndian extends 
WrappedByteBuf {
     private void _setLong(int index, long value) {
         PlatformDependent.putLong(addr(index), value);
     }
+
+    @Override
+    public byte getByte(int index) {
+      return PlatformDependent.getByte(addr(index));
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+      PlatformDependent.putByte(addr(index), (byte) value);
+      return this;
+    }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index cff56d6..e007bcc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.cache;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -112,7 +113,7 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     for (SerializedField metaData : fieldList) {
       int dataLength = metaData.getBufferLength();
       MaterializedField field = MaterializedField.create(metaData);
-      ByteBuf buf = allocator.buffer(dataLength);
+      DrillBuf buf = allocator.buffer(dataLength);
       buf.writeBytes(input, dataLength);
       ValueVector vector = TypeHelper.getNewVector(field, allocator);
       vector.load(metaData, buf);
@@ -146,7 +147,7 @@ public class VectorAccessibleSerializable extends 
AbstractStreamSerializable {
     UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
         /* ByteBuf associated with the selection vector */
-    ByteBuf svBuf = null;
+    DrillBuf svBuf = null;
     Integer svCount =  null;
 
     if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ByteCodeLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ByteCodeLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ByteCodeLoader.java
index 36db0ed..6488699 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ByteCodeLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ByteCodeLoader.java
@@ -31,8 +31,8 @@ import com.google.common.io.Resources;
 
 class ByteCodeLoader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ByteCodeLoader.class);
-  
-  
+
+
   private final LoadingCache<String, byte[]> byteCode = 
CacheBuilder.newBuilder().maximumSize(10000)
       .expireAfterWrite(10, TimeUnit.MINUTES).build(new 
ClassBytesCacheLoader());
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 44ea5b5..bb24b57 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.util.DrillStringUtils;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.compile.MergeAdapter.MergedClassResult;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.codehaus.commons.compiler.CompileException;
@@ -39,30 +39,12 @@ import com.google.common.collect.Sets;
 public class ClassTransformer {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ClassTransformer.class);
 
-  private ByteCodeLoader byteCodeLoader = new ByteCodeLoader();
-  private AtomicLong index = new AtomicLong(0);
-
-
-//  public <T, I> T getImplementationClassByBody( QueryClassLoader 
classLoader, TemplateClassDefinition<T> templateDefinition, //
-//      String internalClassBody //
-//  ) throws ClassTransformationException {
-//    final String materializedClassName = "org.apache.drill.generated."
-//        + "Gen" + templateDefinition.getExternalInterface().getSimpleName() 
//
-//        + index.getAndIncrement();
-//    // Get Implementation Class
-//    try {
-//      String classBody = ClassBodyBuilder.newBuilder() //
-//          .setClassName(materializedClassName) //
-//          .setBody(internalClassBody) //
-//          .build();
-//      return getImplementationClass(classLoader, templateDefinition, 
classBody, materializedClassName);
-//    } catch (IOException | CompileException e) {
-//      throw new ClassTransformationException("Failure generating class body 
for runtime generated class.", e);
-//    }
-//
-//  }
-
+  private final ByteCodeLoader byteCodeLoader = new ByteCodeLoader();
+  private final DistributedCache cache;
 
+  public ClassTransformer(DistributedCache cache) {
+    this.cache = cache;
+  }
 
   public static class ClassSet{
     public final ClassSet parent;
@@ -175,65 +157,8 @@ public class ClassTransformer {
         return false;
       return true;
     }
-
-
-//
-//    public ClassNames getFixed(ClassNames precompiled, ClassNames generated){
-//      if(!dot.startsWith(precompiled.dot)) throw new 
IllegalStateException(String.format("Expected a class that starts with %s.  
However the class %s does not start with this string.", precompiled.dot, dot));
-//      return new ClassNames(dot.replace(precompiled.dot, generated.dot));
-//    }
   }
 
-//
-//  private void mergeAndInjectClass(QueryClassLoader classLoader, byte[] 
implementationClass, ClassNames precompiled, ClassNames generated){
-//    // Get Template Class
-//    final byte[] templateClass = 
byteCodeLoader.getClassByteCodeFromPath(precompiled.clazz);
-//
-//    // get the runtime generated class's ClassNode from bytecode.
-//    ClassNode impl = getClassNodeFromByteCode(implementationClass);
-//
-//    // Setup adapters for merging, remapping class names and class writing. 
This is done in reverse order of how they
-//    // will be evaluated.
-//
-//    Stopwatch t3;
-//    {
-//
-//      //
-//      ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
-//
-//      ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, 
remapper);
-//      MergeAdapter mergingAdapter = new MergeAdapter(oldTemplateSlashName, 
materializedSlashName, remappingAdapter,
-//          impl);
-//      ClassReader tReader = new ClassReader(templateClass);
-//      tReader.accept(mergingAdapter, ClassReader.EXPAND_FRAMES);
-//      byte[] outputClass = cw.toByteArray();
-////      Files.write(outputClass, new 
File(String.format("/tmp/%d-output.class", fileNum)));
-//      outputClass = cw.toByteArray();
-//
-//      // Load the class
-//      classLoader.injectByteCode(materializedClassName, outputClass);
-//    }
-//    t3.stop();
-//    Stopwatch t4 = new Stopwatch().start();
-//    int i = 0;
-//    for (String s : remapper.getInnerClasses()) {
-//      logger.debug("Setting up sub class {}", s);
-//      // for each sub class, remap them into the new class.
-//      String subclassPath = FileUtils.separator + s + ".class";
-//      final byte[] bytecode = getClassByteCodeFromPath(subclassPath);
-//      RemapClasses localRemapper = new RemapClasses(oldTemplateSlashName, 
materializedSlashName);
-//      Preconditions.checkArgument(localRemapper.getInnerClasses().isEmpty(), 
"Class transformations are only supported for classes that have a single level 
of inner classes.");
-//      ClassWriter subcw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
-//      ClassVisitor remap = new RemappingClassAdapter(subcw, localRemapper);
-//      ClassReader reader = new ClassReader(bytecode);
-//      reader.accept(remap, ClassReader.EXPAND_FRAMES);
-//      byte[] newByteCode = subcw.toByteArray();
-//      classLoader.injectByteCode(s.replace(oldTemplateSlashName, 
materializedSlashName).replace(FileUtils.separatorChar, '.'), newByteCode);
-////      Files.write(subcw.toByteArray(), new 
File(String.format("/tmp/%d-sub-%d.class", fileNum, i)));
-//      i++;
-//    }
-//  }
-//
   private static ClassNode getClassNodeFromByteCode(byte[] bytes) {
     ClassReader iReader = new ClassReader(bytes);
     ClassNode impl = new ClassNode();
@@ -241,10 +166,9 @@ public class ClassTransformer {
     return impl;
   }
 
-  @SuppressWarnings("unchecked")
-  public <T, I> T getImplementationClass( //
+  public Class<?> getImplementationClass( //
       QueryClassLoader classLoader, //
-      TemplateClassDefinition<T> templateDefinition, //
+      TemplateClassDefinition<?> templateDefinition, //
       String entireClass, //
       String materializedClassName) throws ClassTransformationException {
 
@@ -284,17 +208,16 @@ public class ClassTransformer {
 
       Class<?> c = classLoader.findClass(set.generated.dot);
       if (templateDefinition.getExternalInterface().isAssignableFrom(c)) {
-        T instance = (T) c.newInstance();
-        logger.debug("Done compiling (bytecode size={}, time:{} millis).",
-            DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - 
t1) / 1000000);
-        return instance;
+        logger.debug("Done compiling (bytecode size={}, time:{} millis).", 
DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 
1000000);
+        return c;
       } else {
         throw new ClassTransformationException("The requested class did not 
implement the expected interface.");
       }
-    } catch (CompileException | IOException | ClassNotFoundException | 
InstantiationException | IllegalAccessException e) {
+    } catch (CompileException | IOException | ClassNotFoundException e) {
       throw new ClassTransformationException(String.format("Failure generating 
transformation classes for value: \n %s", entireClass), e);
     }
 
   }
 
-}
\ No newline at end of file
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
new file mode 100644
index 0000000..2edc902
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/CodeCompiler.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.local.LocalCache;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.store.sys.local.LocalPStoreProvider;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+public class CodeCompiler {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CodeCompiler.class);
+
+  private final ClassTransformer transformer;
+  private final DistributedCache distributedCache;
+  private final LoadingCache<CodeGenerator<?>, GeneratedClassEntry>  cache;
+  private final DrillConfig config;
+  private final OptionManager systemOptionManager;
+
+  public CodeCompiler(DrillConfig config, DistributedCache distributedCache, 
OptionManager systemOptionManager){
+    this.transformer = new ClassTransformer(distributedCache);
+    this.distributedCache = distributedCache;
+    this.cache = CacheBuilder.newBuilder().build(new Loader());
+    this.systemOptionManager = systemOptionManager;
+    this.config = config;
+  }
+
+  @SuppressWarnings("unchecked")
+  public <T> T getImplementationClass(CodeGenerator<?> cg) throws 
ClassTransformationException, IOException {
+    cg.generate();
+    try {
+      GeneratedClassEntry ce = cache.get(cg);
+      return (T) ce.clazz.newInstance();
+    } catch (ExecutionException | InstantiationException | 
IllegalAccessException e) {
+      throw new ClassTransformationException(e);
+    }
+  }
+
+  private class Loader extends CacheLoader<CodeGenerator<?>, 
GeneratedClassEntry>{
+    @Override
+    public GeneratedClassEntry load(CodeGenerator<?> cg) throws Exception {
+      QueryClassLoader loader = new QueryClassLoader(config, 
systemOptionManager);
+      Class<?> c = transformer.getImplementationClass(loader, 
cg.getDefinition(), cg.getGeneratedCode(), cg.getMaterializedClassName());
+      return new GeneratedClassEntry(loader, c);
+    }
+  }
+
+  private class GeneratedClassEntry {
+
+    private final QueryClassLoader classLoader;
+    private final Class<?> clazz;
+
+    public GeneratedClassEntry(QueryClassLoader classLoader, Class<?> c) {
+      super();
+      this.classLoader = classLoader;
+      this.clazz = c;
+    }
+
+  }
+
+  public static CodeCompiler getTestCompiler(DrillConfig c) throws IOException{
+    return new CodeCompiler(c, new LocalCache(), new SystemOptionManager(c, 
new LocalPStoreProvider(c)).init());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/DrillInitMethodVisitor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/DrillInitMethodVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/DrillInitMethodVisitor.java
new file mode 100644
index 0000000..859a14c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/DrillInitMethodVisitor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile;
+
+import org.apache.drill.exec.compile.sig.SignatureHolder;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+
+public class DrillInitMethodVisitor extends MethodVisitor {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillInitMethodVisitor.class);
+
+  final String className;
+
+  public DrillInitMethodVisitor(String className, MethodVisitor mv){
+    super(Opcodes.ASM4, mv);
+    this.className = className;
+  }
+
+  @Override
+  public void visitInsn(int opcode) {
+    if(opcode == Opcodes.RETURN){
+      super.visitVarInsn(Opcodes.ALOAD, 0); // load this.
+      super.visitMethodInsn(Opcodes.INVOKEVIRTUAL, className, 
SignatureHolder.DRILL_INIT_METHOD, "()V"); // execute drill init.
+    }
+    super.visitInsn(opcode);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
index 30cde91..bd68b43 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/MergeAdapter.java
@@ -17,15 +17,17 @@
  */
 package org.apache.drill.exec.compile;
 
-import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.lang.reflect.Modifier;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.drill.exec.compile.ClassTransformer.ClassNames;
 import org.apache.drill.exec.compile.ClassTransformer.ClassSet;
+import org.apache.drill.exec.compile.bytecode.ValueHolderReplacementVisitor;
+import org.apache.drill.exec.compile.sig.SignatureHolder;
 import org.objectweb.asm.AnnotationVisitor;
 import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.ClassVisitor;
@@ -40,10 +42,10 @@ import org.objectweb.asm.commons.SimpleRemapper;
 import org.objectweb.asm.tree.ClassNode;
 import org.objectweb.asm.tree.FieldNode;
 import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.util.CheckClassAdapter;
+import org.objectweb.asm.util.TraceClassVisitor;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-import com.google.common.io.Files;
 
 /**
  * Serves two purposes. Renames all inner classes references to the outer 
class to the new name. Also adds all the
@@ -55,8 +57,9 @@ class MergeAdapter extends ClassVisitor {
 
   private ClassNode classToMerge;
   private ClassSet set;
-
   private Set<String> mergingNames = Sets.newHashSet();
+  private boolean hasInit;
+  private String name;
 
   private MergeAdapter(ClassSet set, ClassVisitor cv, ClassNode cn) {
     super(Opcodes.ASM4, cv);
@@ -65,6 +68,7 @@ class MergeAdapter extends ClassVisitor {
     for(Object o  : classToMerge.methods){
       String name = ((MethodNode)o).name;
       if(name.equals("<init>")) continue;
+      if(name.equals(SignatureHolder.DRILL_INIT_METHOD)) hasInit = true;
       mergingNames.add( name);
     }
   }
@@ -73,6 +77,7 @@ class MergeAdapter extends ClassVisitor {
   public void visitInnerClass(String name, String outerName, String innerName, 
int access) {
     // logger.debug(String.format("[Inner Class] Name: %s, outerName: %s, 
innerName: %s, templateName: %s, newName: %s.",
     // name, outerName, innerName, templateName, newName));
+
     if (name.startsWith(set.precompiled.slash)) {
 //      outerName = outerName.replace(precompiled.slash, generated.slash);
       name = name.replace(set.precompiled.slash, set.generated.slash);
@@ -92,6 +97,7 @@ class MergeAdapter extends ClassVisitor {
   // visit the class
   public void visit(int version, int access, String name, String signature, 
String superName, String[] interfaces) {
     // use the access and names of the impl class.
+    this.name = name;
     if(name.contains("$")){
       super.visit(version, access, name, signature, superName, interfaces);
     }else{
@@ -102,25 +108,32 @@ class MergeAdapter extends ClassVisitor {
   }
 
   @Override
-  public MethodVisitor visitMethod(int access, String arg1, String arg2, 
String arg3, String[] arg4) {
+  public MethodVisitor visitMethod(int access, String name, String desc, 
String signature, String[] exceptions) {
+
+
     // finalize all methods.
 
     // skip all abstract methods as they should have implementations.
-    if ((access & Modifier.ABSTRACT) != 0 || mergingNames.contains(arg1)) {
+    if ((access & Modifier.ABSTRACT) != 0 || mergingNames.contains(name)) {
 
 //      logger.debug("Skipping copy of '{}()' since it is abstract or listed 
elsewhere.", arg1);
       return null;
     }
-    if(arg3 != null){
-      arg3 = arg3.replace(set.precompiled.slash, set.generated.slash);
+    if(signature != null){
+      signature = signature.replace(set.precompiled.slash, 
set.generated.slash);
     }
     // if( (access & Modifier.PUBLIC) == 0){
     // access = access ^ Modifier.PUBLIC ^ Modifier.PROTECTED | 
Modifier.PRIVATE;
     // }
-    if (!arg1.equals("<init>")) {
+    MethodVisitor mv = super.visitMethod(access, name, desc, signature, 
exceptions);
+    if (!name.equals("<init>")) {
       access = access | Modifier.FINAL;
+    }else{
+      if(hasInit){
+        return new DrillInitMethodVisitor(this.name, mv);
+      }
     }
-    return super.visitMethod(access, arg1, arg2, arg3, arg4);
+    return mv;
   }
 
   @SuppressWarnings("unchecked")
@@ -135,19 +148,20 @@ class MergeAdapter extends ClassVisitor {
     for (Iterator<?> it = classToMerge.methods.iterator(); it.hasNext();) {
       MethodNode mn = (MethodNode) it.next();
 
-      // skip the init.
-      if (mn.name.equals("<init>"))
-        continue;
+      if (mn.name.equals("<init>")) continue;
 
       String[] exceptions = new String[mn.exceptions.size()];
       mn.exceptions.toArray(exceptions);
-      MethodVisitor mv = cv.visitMethod(mn.access | Modifier.FINAL, mn.name, 
mn.desc, mn.signature, exceptions);
+      MethodVisitor   mv = cv.visitMethod(mn.access | Modifier.FINAL, mn.name, 
mn.desc, mn.signature, exceptions);
+
       mn.instructions.resetLabels();
+
       // mn.accept(new RemappingMethodAdapter(mn.access, mn.desc, mv, new
       // SimpleRemapper("org.apache.drill.exec.compile.ExampleTemplate", 
"Bunky")));
       ClassSet top = set;
       while(top.parent != null) top = top.parent;
       mn.accept(new RemappingMethodAdapter(mn.access, mn.desc, mv, new 
SimpleRemapper(top.precompiled.slash, top.generated.slash)));
+
     }
     super.visitEnd();
   }
@@ -175,21 +189,31 @@ class MergeAdapter extends ClassVisitor {
     // Setup adapters for merging, remapping class names and class writing. 
This is done in reverse order of how they
     // will be evaluated.
 
-    ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
+    ClassWriter writer = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
     RemapClasses re = new RemapClasses(set);
-    ClassVisitor remappingAdapter = new RemappingClassAdapter(cw, re);
-    ClassVisitor visitor = remappingAdapter;
-    if(generatedClass != null){
-      visitor = new MergeAdapter(set, remappingAdapter, generatedClass);
-    }
-    ClassReader tReader = new ClassReader(precompiledClass);
-    tReader.accept(visitor, ClassReader.EXPAND_FRAMES);
-    byte[] outputClass = cw.toByteArray();
+    try{
+//      if(generatedClass != null){
+//        ClassNode generatedMerged = new ClassNode();
+//        generatedClass.accept(new 
ValueHolderReplacementVisitor(generatedMerged));
+//        generatedClass = generatedMerged;
+//      }
+      ClassVisitor remappingAdapter = new RemappingClassAdapter(writer, re);
+      ClassVisitor visitor = remappingAdapter;
+      if(generatedClass != null){
+        visitor = new MergeAdapter(set, remappingAdapter, generatedClass);
+      }
+      ClassReader tReader = new ClassReader(precompiledClass);
+      tReader.accept(visitor, ClassReader.SKIP_FRAMES);
+      byte[] outputClass = writer.toByteArray();
 
-    // enable when you want all the generated merged class files to also be 
written to disk.
-    //Files.write(outputClass, new 
File(String.format("/tmp/drill-generated-classes/%s-output.class", 
set.generated.dot)));
+      // enable when you want all the generated merged class files to also be 
written to disk.
+//      Files.write(outputClass, new 
File(String.format("/src/scratch/drill-generated-classes/%s-output.class", 
set.generated.dot)));
 
-    return new MergedClassResult(outputClass, re.getInnerClasses());
+      return new MergedClassResult(outputClass, re.getInnerClasses());
+    }catch(Error | RuntimeException e){
+      logger.error("Failure while merging classes.", e);
+      throw e;
+    }
   }
 
 
@@ -227,4 +251,36 @@ class MergeAdapter extends ClassVisitor {
     }
 
   }
+
+  private static final void check(ClassNode node) {
+    Exception e = null;
+    String error = "";
+
+    try{
+    ClassWriter cw = new ClassWriter(ClassWriter.COMPUTE_FRAMES);
+    ClassVisitor cv = new CheckClassAdapter(cw, true);
+    node.accept(cv);
+
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    CheckClassAdapter.verify(new ClassReader(cw.toByteArray()), false, pw);
+
+    error = sw.toString();
+    }catch(Exception ex){
+      e = ex;
+    }
+
+    if(!error.isEmpty() || e != null){
+      StringWriter sw2 = new StringWriter();
+      PrintWriter pw2 = new PrintWriter(sw2);
+      TraceClassVisitor v = new TraceClassVisitor(pw2);
+      node.accept(v);
+      if(e != null){
+        throw new RuntimeException("Failure validating class.  ByteCode: \n" + 
sw2.toString() + "\n\n====ERRROR====\n" + error, e);
+      }else{
+        throw new RuntimeException("Failure validating class.  ByteCode: \n" + 
sw2.toString() + "\n\n====ERRROR====\n" + error);
+      }
+
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
index 688e680..2b6ddf0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/TemplateClassDefinition.java
@@ -22,14 +22,14 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.drill.exec.compile.sig.SignatureHolder;
 
 public class TemplateClassDefinition<T>{
-  
+
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TemplateClassDefinition.class);
-  
+
   private final Class<T> iface;
   private final Class<?> template;
   private final SignatureHolder signature;
   private static final AtomicLong classNumber = new AtomicLong(0);
-  
+
   public <X extends T> TemplateClassDefinition(Class<T> iface, Class<X> 
template) {
     super();
     this.iface = iface;
@@ -47,7 +47,7 @@ public class TemplateClassDefinition<T>{
   public long getNextClassNumber(){
     return classNumber.getAndIncrement();
   }
-  
+
   public Class<T> getExternalInterface() {
     return iface;
   }
@@ -66,6 +66,11 @@ public class TemplateClassDefinition<T>{
     return "TemplateClassDefinition [template=" + template + ", signature=" + 
signature + "]";
   }
 
-  
-  
+  @Override
+  public boolean equals(Object obj) {
+    return super.equals(obj);
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/DirectSorter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/DirectSorter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/DirectSorter.java
new file mode 100644
index 0000000..4e54bc7
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/DirectSorter.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.commons.LocalVariablesSorter;
+
+public class DirectSorter extends LocalVariablesSorter{
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DirectSorter.class);
+
+  public DirectSorter(int access, String desc, MethodVisitor mv) {
+    super(access, desc, mv);
+  }
+
+  public void directVarInsn(int opcode, int var) {
+    mv.visitVarInsn(opcode, var);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
new file mode 100644
index 0000000..e736aab
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/InstructionModifier.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.apache.drill.exec.compile.bytecode.ValueHolderIden.ValueHolderSub;
+import org.objectweb.asm.Label;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+
+import com.carrotsearch.hppc.IntIntOpenHashMap;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+
+public class InstructionModifier extends MethodVisitor {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(InstructionModifier.class);
+
+  private final IntObjectOpenHashMap<ValueHolderIden.ValueHolderSub> oldToNew 
= new IntObjectOpenHashMap<>();
+  private final IntIntOpenHashMap oldLocalToFirst = new IntIntOpenHashMap();
+
+  private DirectSorter adder;
+  int lastLineNumber = 0;
+  private TrackingInstructionList list;
+
+  public InstructionModifier(int access, String name, String desc, String 
signature, String[] exceptions,
+      TrackingInstructionList list, MethodVisitor inner) {
+    super(Opcodes.ASM4, new DirectSorter(access, desc, inner));
+    this.list = list;
+    this.adder = (DirectSorter) mv;
+  }
+
+  public void setList(TrackingInstructionList list) {
+    this.list = list;
+  }
+
+  private ReplacingBasicValue local(int var) {
+    Object o = list.currentFrame.getLocal(var);
+    if (o instanceof ReplacingBasicValue) {
+      return (ReplacingBasicValue) o;
+    }
+    return null;
+  }
+
+  private ReplacingBasicValue popCurrent() {
+    return popCurrent(false);
+  }
+
+  private ReplacingBasicValue popCurrent(boolean includeReturnVals) {
+    // for vararg, we could try to pop an empty stack. TODO: handle this 
better.
+    if (list.currentFrame.getStackSize() == 0)
+      return null;
+
+    Object o = list.currentFrame.pop();
+    if (o instanceof ReplacingBasicValue) {
+      ReplacingBasicValue v = (ReplacingBasicValue) o;
+      if (!v.isFunctionReturn || includeReturnVals) {
+        return v;
+      }
+    }
+    return null;
+  }
+
+  private ReplacingBasicValue getReturn() {
+    Object o = list.nextFrame.getStack(list.nextFrame.getStackSize() - 1);
+    if (o instanceof ReplacingBasicValue)
+      return (ReplacingBasicValue) o;
+    return null;
+  }
+
+  @Override
+  public void visitInsn(int opcode) {
+    switch (opcode) {
+    case Opcodes.DUP:
+      if (popCurrent() != null)
+        return;
+    }
+    super.visitInsn(opcode);
+  }
+
+  @Override
+  public void visitTypeInsn(int opcode, String type) {
+    ReplacingBasicValue r = getReturn();
+    if (r != null) {
+      ValueHolderSub sub = r.getIden().getHolderSub(adder);
+      oldToNew.put(r.getIndex(), sub);
+    } else {
+      super.visitTypeInsn(opcode, type);
+    }
+  }
+
+  @Override
+  public void visitLineNumber(int line, Label start) {
+    lastLineNumber = line;
+    super.visitLineNumber(line, start);
+  }
+
+  @Override
+  public void visitVarInsn(int opcode, int var) {
+    ReplacingBasicValue v;
+    if(opcode == Opcodes.ASTORE && (v = popCurrent(true)) != null){
+      if(!v.isFunctionReturn){
+        ValueHolderSub from = oldToNew.get(v.getIndex());
+
+        ReplacingBasicValue current = local(var);
+        // if local var is set, then transfer to it to the existing holders in 
the local position.
+        if(current != null){
+          if(oldToNew.get(current.getIndex()).iden() == from.iden()){
+            int targetFirst = oldToNew.get(current.index).first();
+            from.transfer(this, targetFirst);
+            return;
+          }
+        }
+
+        // if local var is not set, then check map to see if existing holders 
are mapped to local var.
+        if(oldLocalToFirst.containsKey(var)){
+          ValueHolderSub sub = oldToNew.get(oldLocalToFirst.lget());
+          if(sub.iden() == from.iden()){
+            // if they are, then transfer to that.
+            from.transfer(this, oldToNew.get(oldLocalToFirst.lget()).first());
+            return;
+          }
+        }
+
+
+        // map from variables to global space for future use.
+        oldLocalToFirst.put(var, v.getIndex());
+
+      }else{
+        // this is storage of a function return, we need to map the fields to 
the holder spots.
+        int first;
+        if(oldLocalToFirst.containsKey(var)){
+          first = oldToNew.get(oldLocalToFirst.lget()).first();
+          v.iden.transferToLocal(adder, first);
+        }else{
+          first = v.iden.createLocalAndTrasfer(adder);
+        }
+        ValueHolderSub from = v.iden.getHolderSubWithDefinedLocals(first);
+        oldToNew.put(v.getIndex(), from);
+        v.disableFunctionReturn();
+      }
+
+    }else if(opcode == Opcodes.ALOAD && (v = getReturn()) != null){
+
+      // noop.
+    }else{
+      super.visitVarInsn(opcode, var);
+    }
+
+
+  }
+
+  void directVarInsn(int opcode, int var) {
+    adder.directVarInsn(opcode, var);
+  }
+
+  @Override
+  public void visitFieldInsn(int opcode, String owner, String name, String 
desc) {
+    ReplacingBasicValue v;
+
+    switch (opcode) {
+    case Opcodes.PUTFIELD:
+      // pop twice for put.
+      v = popCurrent(true);
+      if (v != null) {
+        if(v.isFunctionReturn){
+          super.visitFieldInsn(opcode, owner, name, desc);
+          return;
+        }else{
+          // we are trying to store a replaced variable in an external 
context, we need to generate an instance and
+          // transfer it out.
+          ValueHolderSub sub = oldToNew.get(v.getIndex());
+          sub.transferToExternal(adder, owner, name, desc);
+          return;
+        }
+      }
+
+    case Opcodes.GETFIELD:
+      // pop again.
+      v = popCurrent();
+      if (v != null) {
+        // super.visitFieldInsn(opcode, owner, name, desc);
+        ValueHolderSub sub = oldToNew.get(v.getIndex());
+        sub.addInsn(name, this, opcode);
+        return;
+      }
+
+
+    }
+
+    super.visitFieldInsn(opcode, owner, name, desc);
+  }
+
+  @Override
+  public void visitLocalVariable(String name, String desc, String signature, 
Label start, Label end, int index) {
+    // if (!ReplacingInterpreter.HOLDER_DESCRIPTORS.contains(desc)) {
+    // super.visitLocalVariable(name, desc, signature, start, end, index);
+    // }
+  }
+
+  @Override
+  public void visitMethodInsn(int opcode, String owner, String name, String 
desc) {
+    final int len = Type.getArgumentTypes(desc).length;
+    final boolean isStatic = opcode == Opcodes.INVOKESTATIC;
+
+    ReplacingBasicValue obj = popCurrent();
+
+    if (obj != null && !isStatic) {
+      if ("<init>".equals(name)) {
+        oldToNew.get(obj.getIndex()).init(adder);
+      } else {
+        throw new IllegalStateException("you can't call a method on a value 
holder.");
+      }
+      return;
+    }
+
+    obj = getReturn();
+
+    if (obj != null) {
+      // the return of this method is an actual instance of the object we're 
escaping. Update so that it get's mapped
+      // correctly.
+      super.visitMethodInsn(opcode, owner, name, desc);
+      obj.markFunctionReturn();
+      return;
+    }
+
+    int i = isStatic ? 1 : 0;
+    for (; i < len; i++) {
+      checkArg(name, popCurrent());
+    }
+
+    super.visitMethodInsn(opcode, owner, name, desc);
+  }
+
+  private void checkArg(String name, ReplacingBasicValue obj) {
+    if (obj == null)
+      return;
+    throw new IllegalStateException(
+        String
+            .format(
+                "Holder types are not allowed to be passed between methods.  
Ran across problem attempting to invoke method '%s' on line number %d",
+                name, lastLineNumber));
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingBasicValue.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingBasicValue.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingBasicValue.java
new file mode 100644
index 0000000..8e05602
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingBasicValue.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+public class ReplacingBasicValue extends BasicValue{
+
+  ValueHolderIden iden;
+  int index;
+  Type type;
+  boolean isFunctionReturn = false;
+
+  public ReplacingBasicValue(Type type, ValueHolderIden iden, int index) {
+    super(type);
+    this.index = index;
+    this.iden = iden;
+    this.type = type;
+  }
+
+  public void markFunctionReturn(){
+    this.isFunctionReturn = true;
+  }
+
+  public void disableFunctionReturn(){
+    this.isFunctionReturn = false;
+  }
+
+  public ValueHolderIden getIden() {
+    return iden;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public Type getType(){
+    return type;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingInterpreter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingInterpreter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingInterpreter.java
new file mode 100644
index 0000000..2dce4db
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ReplacingInterpreter.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.TypeInsnNode;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.BasicInterpreter;
+import org.objectweb.asm.tree.analysis.BasicValue;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class ReplacingInterpreter extends BasicInterpreter {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ReplacingInterpreter.class);
+
+  private int index = 0;
+
+  @Override
+  public BasicValue newValue(Type t) {
+    if(t != null){
+      ValueHolderIden iden = HOLDERS.get(t.getDescriptor());
+      if(iden != null){
+        ReplacingBasicValue v = new ReplacingBasicValue(t, iden, index++);
+        v.markFunctionReturn();
+        return v;
+      }
+    }
+    return super.newValue(t);
+
+  }
+
+  @Override
+  public BasicValue newOperation(AbstractInsnNode insn) throws 
AnalyzerException {
+    if(insn.getOpcode() == Opcodes.NEW){
+      TypeInsnNode t = (TypeInsnNode) insn;
+      ValueHolderIden iden = HOLDERS.get(t.desc);
+
+      if(iden != null){
+        return new ReplacingBasicValue(Type.getObjectType(t.desc), iden, 
index++);
+      }
+    }
+
+    return super.newOperation(insn);
+  }
+
+  private static String desc(Class<?> c) {
+    Type t = Type.getType(c);
+    return t.getDescriptor();
+  }
+
+
+
+  static {
+
+    ImmutableMap.Builder<String, ValueHolderIden> builder = 
ImmutableMap.builder();
+    ImmutableSet.Builder<String> setB = ImmutableSet.builder();
+    for (Class<?> c : ScalarReplacementTypes.CLASSES) {
+      String desc = desc(c);
+      setB.add(desc);
+      String desc2 = desc.substring(1, desc.length() - 1);
+      ValueHolderIden vhi = new ValueHolderIden(c);
+      builder.put(desc, vhi);
+      builder.put(desc2, vhi);
+    }
+    HOLDER_DESCRIPTORS = setB.build();
+    HOLDERS = builder.build();
+  }
+
+  private final static ImmutableMap<String, ValueHolderIden> HOLDERS;
+  public final static ImmutableSet<String> HOLDER_DESCRIPTORS;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
new file mode 100644
index 0000000..8a38d32
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementNode.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.tree.MethodNode;
+import org.objectweb.asm.tree.analysis.Analyzer;
+import org.objectweb.asm.tree.analysis.AnalyzerException;
+import org.objectweb.asm.tree.analysis.BasicValue;
+import org.objectweb.asm.tree.analysis.Frame;
+
+public class ScalarReplacementNode extends MethodNode {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScalarReplacementNode.class);
+
+
+  String[] exceptionsArr;
+  MethodVisitor inner;
+
+  public ScalarReplacementNode(int access, String name, String desc, String 
signature, String[] exceptions, MethodVisitor inner) {
+    super(access, name, desc, signature, exceptions);
+    this.exceptionsArr = exceptions;
+    this.inner = inner;
+  }
+
+
+  @Override
+  public void visitEnd() {
+    super.visitEnd();
+
+    Analyzer<BasicValue> a = new Analyzer<>(new ReplacingInterpreter());
+    Frame<BasicValue>[] frames;
+    try {
+      frames = a.analyze("Object", this);
+    } catch (AnalyzerException e) {
+      throw new IllegalStateException(e);
+    }
+    TrackingInstructionList list = new TrackingInstructionList(frames, 
this.instructions);
+    this.instructions = list;
+    InstructionModifier holderV = new InstructionModifier(this.access, 
this.name, this.desc, this.signature, this.exceptionsArr, list, inner);
+    accept(holderV);
+  }
+
+
+  IntHolder local;
+  public void x(){
+    IntHolder h = new IntHolder();
+    local = h;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementTypes.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementTypes.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementTypes.java
new file mode 100644
index 0000000..a54ca72
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ScalarReplacementTypes.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal18Holder;
+import org.apache.drill.exec.expr.holders.Decimal28DenseHolder;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38DenseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.IntervalDayHolder;
+import org.apache.drill.exec.expr.holders.IntervalHolder;
+import org.apache.drill.exec.expr.holders.IntervalYearHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal18Holder;
+import org.apache.drill.exec.expr.holders.NullableDecimal28DenseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal38DenseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableDecimal9Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableIntervalDayHolder;
+import org.apache.drill.exec.expr.holders.NullableIntervalHolder;
+import org.apache.drill.exec.expr.holders.NullableIntervalYearHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampTZHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TimeStampTZHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+
+import com.google.common.collect.ImmutableSet;
+
+public class ScalarReplacementTypes {
+
+  private ScalarReplacementTypes(){}
+
+  static {
+
+    Class<?>[] classList = {
+        BitHolder.class,
+        IntHolder.class,
+        BigIntHolder.class,
+        Float4Holder.class,
+        Float8Holder.class,
+        Decimal9Holder.class,
+        Decimal18Holder.class,
+        Decimal28SparseHolder.class,
+        Decimal28DenseHolder.class,
+        Decimal38SparseHolder.class,
+        Decimal38DenseHolder.class,
+        IntervalHolder.class,
+        IntervalDayHolder.class,
+        IntervalYearHolder.class,
+        DateHolder.class,
+        TimeHolder.class,
+        TimeStampHolder.class,
+        TimeStampTZHolder.class,
+        VarCharHolder.class,
+        VarBinaryHolder.class,
+        NullableBitHolder.class,
+        NullableIntHolder.class,
+        NullableBigIntHolder.class,
+        NullableFloat4Holder.class,
+        NullableFloat8Holder.class,
+        NullableVarCharHolder.class,
+        NullableVarBinaryHolder.class,
+        NullableDecimal9Holder.class,
+        NullableDecimal18Holder.class,
+        NullableDecimal28SparseHolder.class,
+        NullableDecimal28DenseHolder.class,
+        NullableDecimal38SparseHolder.class,
+        NullableDecimal38DenseHolder.class,
+        NullableIntervalHolder.class,
+        NullableIntervalDayHolder.class,
+        NullableIntervalYearHolder.class,
+        NullableDateHolder.class,
+        NullableTimeHolder.class,
+        NullableTimeStampHolder.class,
+        NullableTimeStampTZHolder.class
+    };
+
+    CLASSES = ImmutableSet.copyOf(classList);
+  }
+
+  public static final ImmutableSet<Class<?>> CLASSES;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/TrackingInstructionList.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/TrackingInstructionList.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/TrackingInstructionList.java
new file mode 100644
index 0000000..7cab17c
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/TrackingInstructionList.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.tree.AbstractInsnNode;
+import org.objectweb.asm.tree.InsnList;
+import org.objectweb.asm.tree.analysis.Frame;
+
+public class TrackingInstructionList extends InsnList {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TrackingInstructionList.class);
+
+  Frame<?> currentFrame;
+  Frame<?> nextFrame;
+  Frame<?>[] frames;
+  InsnList inner;
+  int index = 0;
+
+
+
+  public TrackingInstructionList(Frame<?>[] frames, InsnList inner) {
+    super();
+
+    this.frames = frames;
+    this.inner = inner;
+  }
+
+  public InsnList getInner(){
+    return inner;
+  }
+
+  @Override
+  public void accept(MethodVisitor mv) {
+    AbstractInsnNode insn = inner.getFirst();
+    while (insn != null) {
+        currentFrame = frames[index];
+        nextFrame = index +1 < frames.length ? frames[index+1] : null;
+        insn.accept(mv);
+
+        insn = insn.getNext();
+        index++;
+    }
+  }
+
+
+  @Override
+  public int size() {
+    return inner.size();
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ValueHolderIden.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ValueHolderIden.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ValueHolderIden.java
new file mode 100644
index 0000000..5099fbd
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/bytecode/ValueHolderIden.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.compile.bytecode;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.List;
+
+import org.objectweb.asm.MethodVisitor;
+import org.objectweb.asm.Opcodes;
+import org.objectweb.asm.Type;
+import org.objectweb.asm.commons.LocalVariablesSorter;
+
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.google.common.collect.Lists;
+
+class ValueHolderIden {
+
+  final ObjectIntOpenHashMap<String> fieldMap;
+  final Type[] types;
+  final String[] names;
+  final Type type;
+
+  public ValueHolderIden(Class<?> c) {
+    Field[] fields = c.getFields();
+
+    List<Field> fldList = Lists.newArrayList();
+    for(Field f : fields){
+      if(!Modifier.isStatic(f.getModifiers())) {
+        fldList.add(f);
+      }
+    }
+    this.type = Type.getType(c);
+    this.types = new Type[fldList.size()];
+    this.names = new String[fldList.size()];
+    fieldMap = new ObjectIntOpenHashMap<String>();
+    int i =0;
+    for(Field f : fldList){
+      types[i] = Type.getType(f.getType());
+      names[i] = f.getName();
+      fieldMap.put(f.getName(), i);
+      i++;
+    }
+  }
+
+  private static void initType(int index, Type t, DirectSorter v){
+    switch(t.getSort()){
+    case Type.BOOLEAN:
+    case Type.BYTE:
+    case Type.CHAR:
+    case Type.SHORT:
+    case Type.INT:
+      v.visitInsn(Opcodes.ICONST_0);
+      v.directVarInsn(Opcodes.ISTORE, index);
+      break;
+    case Type.LONG:
+      v.visitInsn(Opcodes.LCONST_0);
+      v.directVarInsn(Opcodes.LSTORE, index);
+      break;
+    case Type.FLOAT:
+      v.visitInsn(Opcodes.FCONST_0);
+      v.directVarInsn(Opcodes.FSTORE, index);
+      break;
+    case Type.DOUBLE:
+      v.visitInsn(Opcodes.DCONST_0);
+      v.directVarInsn(Opcodes.DSTORE, index);
+      break;
+    case Type.OBJECT:
+      v.visitInsn(Opcodes.ACONST_NULL);
+      v.directVarInsn(Opcodes.ASTORE, index);
+      break;
+    default:
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public ValueHolderSub getHolderSub(DirectSorter adder) {
+    int first = -1;
+    for (int i = 0; i < types.length; i++) {
+      int varIndex = adder.newLocal(types[i]);
+      if (i == 0) {
+        first = varIndex;
+      }
+    }
+
+    return new ValueHolderSub(first);
+
+  }
+
+  public ValueHolderSub getHolderSubWithDefinedLocals(int first){
+    return new ValueHolderSub(first);
+  }
+
+  private int dup(Type t){
+    return t.getSize() == 1 ? Opcodes.DUP : Opcodes.DUP2;
+  }
+
+  public void transferToLocal(DirectSorter adder, int localVariable){
+    for (int i = 0; i < types.length; i++) {
+      Type t = types[i];
+      if(i + 1 < types.length) adder.visitInsn(dup(t)); // don't dup for last 
value.
+      adder.visitFieldInsn(Opcodes.GETFIELD, type.getInternalName(), names[i], 
t.getDescriptor());
+      adder.directVarInsn(t.getOpcode(Opcodes.ISTORE), localVariable+i);
+    }
+  }
+
+
+
+
+  public int createLocalAndTrasfer(DirectSorter adder){
+    int first = 0;
+    for (int i = 0; i < types.length; i++) {
+      Type t = types[i];
+      int varIndex = adder.newLocal(t);
+      if (i == 0) {
+        first = varIndex;
+      }
+    }
+    transferToLocal(adder, first);
+    return first;
+  }
+
+  public class ValueHolderSub {
+    private int first;
+
+    public ValueHolderSub(int first) {
+      assert first != -1 : "Create Holder for sub that doesn't have any 
fields.";
+      this.first = first;
+    }
+
+    public ValueHolderIden iden(){
+      return ValueHolderIden.this;
+    }
+
+    public void init(DirectSorter mv){
+      for (int i = 0; i < types.length; i++) {
+        initType(first+i, types[i], mv);
+      }
+    }
+    public int size(){
+      return types.length;
+    }
+
+    public int first(){
+      return first;
+    }
+
+    public void updateFirst(int newFirst){
+      this.first = newFirst;
+    }
+
+    private int field(String name, InstructionModifier mv) {
+      if (!fieldMap.containsKey(name)) throw new 
IllegalArgumentException(String.format("Unknown name '%s' on line %d.", name, 
mv.lastLineNumber));
+      return fieldMap.lget();
+    }
+
+    public void addInsn(String name, InstructionModifier mv, int opcode) {
+      switch (opcode) {
+      case Opcodes.GETFIELD:
+        addKnownInsn(name, mv, Opcodes.ILOAD);
+        return;
+
+      case Opcodes.PUTFIELD:
+        addKnownInsn(name, mv, Opcodes.ISTORE);
+      }
+    }
+
+    public void transfer(InstructionModifier mv, int newStart){
+      if(first == newStart) return;
+      for(int i =0; i < types.length; i++){
+        mv.directVarInsn(types[i].getOpcode(Opcodes.ILOAD), first + i);
+        mv.directVarInsn(types[i].getOpcode(Opcodes.ISTORE), newStart + i);
+      }
+      this.first = newStart;
+    }
+
+    private void addKnownInsn(String name, InstructionModifier mv, int 
analogOpcode) {
+      int f = field(name, mv);
+      Type t = types[f];
+      mv.directVarInsn(t.getOpcode(analogOpcode), first + f);
+    }
+
+    public void transferToExternal(DirectSorter adder, String owner, String 
name, String desc){
+
+      // create a new object and assign it to the desired field.
+      adder.visitTypeInsn(Opcodes.NEW, type.getInternalName());
+      adder.visitInsn(dup(type));
+      adder.visitMethodInsn(Opcodes.INVOKESPECIAL, type.getInternalName(), 
"<init>", "()V");
+
+      // now we need to set all of the values of this new object.
+      for (int i = 0; i < types.length; i++) {
+        Type t = types[i];
+
+        // dup the object where we are putting the field.
+        adder.visitInsn(dup(type)); // dup for every as we want to save in 
place at end.
+        adder.directVarInsn(t.getOpcode(Opcodes.ILOAD), first+i);
+        adder.visitFieldInsn(Opcodes.PUTFIELD, type.getInternalName(), 
names[i], t.getDescriptor());
+      }
+
+      // lastly we save it to the desired field.
+      adder.visitFieldInsn(Opcodes.PUTFIELD, owner, name, desc);
+
+    }
+
+  }
+
+
+}
\ No newline at end of file

Reply via email to