Author: sershe
Date: Mon Feb 2 22:28:15 2015
New Revision: 1656595
URL: http://svn.apache.org/r1656595
Log:
HIVE-9418p2 : Part of the encoded data production pipeline (incomplete, only to
allow parallel work)
Added:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
Removed:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadata.java
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/branches/llap/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
(original)
+++
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/EncodedColumn.java
Mon Feb 2 22:28:15 2015
@@ -18,24 +18,42 @@
package org.apache.hadoop.hive.llap.io.api;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
public class EncodedColumn<BatchKey> {
// TODO: temporary class. Will be filled in when reading (ORC) is
implemented. Need to balance
// generality, and ability to not copy data from underlying low-level
cached buffers.
- public static class ColumnBuffer {
+ public static class StreamBuffer {
+ public StreamBuffer(int firstOffset, int lastLength) {
+ this.firstOffset = firstOffset;
+ this.lastLength = lastLength;
+ }
// TODO: given how ORC will allocate, it might make sense to share array
between all
// returned encodedColumn-s, and store index and length in the array.
- public LlapMemoryBuffer[] cacheBuffers;
+ public List<LlapMemoryBuffer> cacheBuffers;
public int firstOffset, lastLength;
+ // StreamBuffer can be reused for many RGs (e.g. dictionary case). To
avoid locking every
+ // LlapMemoryBuffer 500 times, have a separate refcount on StreamBuffer
itself.
+ public AtomicInteger refCount = new AtomicInteger(0);
+ public void incRef() {
+ refCount.incrementAndGet();
+ }
+ public int decRef() {
+ return refCount.decrementAndGet();
+ }
}
- public EncodedColumn(BatchKey batchKey, int columnIndex, ColumnBuffer
columnData) {
+ public EncodedColumn(BatchKey batchKey, int columnIndex, int streamCount) {
this.batchKey = batchKey;
this.columnIndex = columnIndex;
- this.columnData = columnData;
+ this.streamData = new StreamBuffer[streamCount];
+ this.streamKind = new int[streamCount];
}
public BatchKey batchKey;
public int columnIndex;
- public ColumnBuffer columnData;
+ public StreamBuffer[] streamData;
+ public int[] streamKind; // TODO: can decoder infer this from metadata?
}
\ No newline at end of file
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
(original)
+++
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
Mon Feb 2 22:28:15 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.llap.io.api.cache;
import java.util.LinkedList;
+import java.util.List;
import org.apache.hadoop.hive.common.DiskRange;
@@ -48,6 +49,8 @@ public interface LowLevelCache {
*/
void allocateMultiple(LlapMemoryBuffer[] dest, int size);
- void releaseBuffers(LlapMemoryBuffer[] cacheBuffers);
+ void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers);
+
+ LlapMemoryBuffer createUnallocated();
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/Cache.java
Mon Feb 2 22:28:15 2015
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
/** Dummy interface for now, might be different. */
public interface Cache<CacheKey> {
- public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value);
- public ColumnBuffer get(CacheKey key);
+ public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value);
+ public StreamBuffer get(CacheKey key);
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
Mon Feb 2 22:28:15 2015
@@ -121,8 +121,7 @@ public class LowLevelCacheImpl implement
if (currentNotCached.offset == currentCached.offset) {
if (currentNotCached.end <= currentCached.end) { // we assume it's
always "==" now
// Replace the entire current DiskRange with new cached range.
- drIter.remove();
- drIter.add(currentCached);
+ drIter.set(currentCached);
currentNotCached = null;
} else {
// Insert the new cache range before the disk range.
@@ -251,9 +250,9 @@ public class LowLevelCacheImpl implement
}
@Override
- public void releaseBuffers(LlapMemoryBuffer[] cacheBuffers) {
- for (int i = 0; i < cacheBuffers.length; ++i) {
- releaseBufferInternal((LlapCacheableBuffer)cacheBuffers[i]);
+ public void releaseBuffers(List<LlapMemoryBuffer> cacheBuffers) {
+ for (LlapMemoryBuffer b : cacheBuffers) {
+ releaseBufferInternal((LlapCacheableBuffer)b);
}
}
@@ -399,4 +398,9 @@ public class LowLevelCacheImpl implement
}
}
}
+
+ @Override
+ public LlapMemoryBuffer createUnallocated() {
+ return new LlapCacheableBuffer();
+ }
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelLrfuCachePolicy.java
Mon Feb 2 22:28:15 2015
@@ -165,7 +165,6 @@ public class LowLevelLrfuCachePolicy ext
continue;
}
// Update the state to removed-from-list, so that parallel
notifyUnlock doesn't modify us.
- // TODO#: double check this is valid!
nextCandidate.indexInHeap = LlapCacheableBuffer.NOT_IN_CACHE;
evicted += nextCandidate.byteBuffer.remaining();
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/NoopCache.java
Mon Feb 2 22:28:15 2015
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.llap.cache;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
public class NoopCache<CacheKey> implements Cache<CacheKey> {
@Override
- public ColumnBuffer cacheOrGet(CacheKey key, ColumnBuffer value) {
+ public StreamBuffer cacheOrGet(CacheKey key, StreamBuffer value) {
return value;
}
@Override
- public ColumnBuffer get(CacheKey key) {
+ public StreamBuffer get(CacheKey key) {
return null; // TODO: ensure real implementation increases refcount
}
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
Mon Feb 2 22:28:15 2015
@@ -68,8 +68,9 @@ public class LlapInputFormat
if (includedCols.isEmpty()) {
includedCols = null; // Also means read all columns? WTF?
}
- VectorReader reader = llapIo.getReader(
- fileSplit, includedCols, SearchArgumentFactory.createFromConf(job));
+ VectorReader reader = llapIo.getReader(fileSplit, includedCols,
+ SearchArgumentFactory.createFromConf(job),
+ ColumnProjectionUtils.getReadColumnNames(job));
return new LlapRecordReader(reader, job, fileSplit);
} catch (Exception ex) {
throw new IOException(ex);
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapIoImpl.java
Mon Feb 2 22:28:15 2015
@@ -70,8 +70,9 @@ public class LlapIoImpl implements LlapI
this.cvp = new OrcColumnVectorProducer(threadPool, edp, conf);
}
- VectorReader getReader(InputSplit split, List<Integer> columnIds,
SearchArgument sarg) {
- return new VectorReader(split, columnIds, sarg, cvp);
+ VectorReader getReader(InputSplit split,
+ List<Integer> columnIds, SearchArgument sarg, String[] columnNames) {
+ return new VectorReader(split, columnIds, sarg, columnNames, cvp);
}
@Override
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/VectorReader.java
Mon Feb 2 22:28:15 2015
@@ -36,6 +36,7 @@ public class VectorReader implements Con
private final InputSplit split;
private final List<Integer> columnIds;
private final SearchArgument sarg;
+ private final String[] columnNames;
private final ColumnVectorProducer<?> cvp;
private final LinkedList<ColumnVectorBatch> pendingData = new
LinkedList<ColumnVectorBatch>();
@@ -45,10 +46,11 @@ public class VectorReader implements Con
private ConsumerFeedback<ColumnVectorBatch> feedback;
public VectorReader(InputSplit split, List<Integer> columnIds,
SearchArgument sarg,
- ColumnVectorProducer<?> cvp) {
+ String[] columnNames, ColumnVectorProducer<?> cvp) {
this.split = split;
this.columnIds = columnIds;
this.sarg = sarg;
+ this.columnNames = columnNames;
this.cvp = cvp;
}
@@ -56,7 +58,7 @@ public class VectorReader implements Con
// TODO: if some collection is needed, return previous ColumnVectorBatch
here
ColumnVectorBatch current = null;
if (feedback == null) {
- feedback = cvp.read(split, columnIds, sarg, this);
+ feedback = cvp.read(split, columnIds, sarg, columnNames, this);
}
if (isClosed) {
throw new AssertionError("next called after close");
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java
Mon Feb 2 22:28:15 2015
@@ -27,7 +27,7 @@ import java.util.concurrent.ExecutorServ
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
import org.apache.hadoop.hive.llap.io.encoded.EncodedDataReader;
@@ -42,12 +42,13 @@ public abstract class ColumnVectorProduc
this.executor = executor;
}
+ // TODO#: Given how ORC reads data, it should return this and not separate
columns.
static class EncodedColumnBatch {
public EncodedColumnBatch(int colCount) {
- columnDatas = new ColumnBuffer[colCount];
+ columnDatas = new StreamBuffer[colCount][];
columnsRemaining = colCount;
}
- public ColumnBuffer[] columnDatas;
+ public StreamBuffer[][] columnDatas;
public int columnsRemaining;
}
@@ -57,7 +58,7 @@ public abstract class ColumnVectorProduc
// TODO: use array, precreate array based on metadata first? Works for
ORC. For now keep dumb.
private final HashMap<BatchKey, EncodedColumnBatch> pendingData =
new HashMap<BatchKey, EncodedColumnBatch>();
- private ConsumerFeedback<ColumnBuffer> upstreamFeedback;
+ private ConsumerFeedback<StreamBuffer> upstreamFeedback;
private final Consumer<ColumnVectorBatch> downstreamConsumer;
private final int colCount;
@@ -66,7 +67,7 @@ public abstract class ColumnVectorProduc
this.colCount = colCount;
}
- public void init(ConsumerFeedback<ColumnBuffer> upstreamFeedback) {
+ public void init(ConsumerFeedback<StreamBuffer> upstreamFeedback) {
this.upstreamFeedback = upstreamFeedback;
}
@@ -85,7 +86,7 @@ public abstract class ColumnVectorProduc
}
}
if (localIsStopped) {
- upstreamFeedback.returnData(data.columnData);
+ returnProcessed(data.streamData);
return;
}
@@ -94,7 +95,7 @@ public abstract class ColumnVectorProduc
// Check if we are stopped and the batch was already cleaned.
localIsStopped = (targetBatch.columnDatas == null);
if (!localIsStopped) {
- targetBatch.columnDatas[data.columnIndex] = data.columnData;
+ targetBatch.columnDatas[data.columnIndex] = data.streamData;
colsRemaining = --targetBatch.columnsRemaining;
if (0 == colsRemaining) {
synchronized (pendingData) {
@@ -107,14 +108,22 @@ public abstract class ColumnVectorProduc
}
}
if (localIsStopped) {
- upstreamFeedback.returnData(data.columnData);
+ returnProcessed(data.streamData);
return;
}
if (0 == colsRemaining) {
ColumnVectorProducer.this.decodeBatch(data.batchKey, targetBatch,
downstreamConsumer);
// Batch has been decoded; unlock the buffers in cache
- for (ColumnBuffer cb : targetBatch.columnDatas) {
- upstreamFeedback.returnData(cb);
+ for (StreamBuffer[] columnData : targetBatch.columnDatas) {
+ returnProcessed(columnData);
+ }
+ }
+ }
+
+ private void returnProcessed(StreamBuffer[] data) {
+ for (StreamBuffer sb : data) {
+ if (sb.decRef() == 0) {
+ upstreamFeedback.returnData(sb);
}
}
}
@@ -142,7 +151,7 @@ public abstract class ColumnVectorProduc
}
private void dicardPendingData(boolean isStopped) {
- List<ColumnBuffer> dataToDiscard = new
ArrayList<ColumnBuffer>(pendingData.size() * colCount);
+ List<StreamBuffer> dataToDiscard = new
ArrayList<StreamBuffer>(pendingData.size() * colCount);
List<EncodedColumnBatch> batches = new
ArrayList<EncodedColumnBatch>(pendingData.size());
synchronized (pendingData) {
if (isStopped) {
@@ -153,14 +162,18 @@ public abstract class ColumnVectorProduc
}
for (EncodedColumnBatch batch : batches) {
synchronized (batch) {
- for (ColumnBuffer b : batch.columnDatas) {
- dataToDiscard.add(b);
+ for (StreamBuffer[] bb : batch.columnDatas) {
+ for (StreamBuffer b : bb) {
+ dataToDiscard.add(b);
+ }
}
batch.columnDatas = null;
}
}
- for (ColumnBuffer data : dataToDiscard) {
- upstreamFeedback.returnData(data);
+ for (StreamBuffer data : dataToDiscard) {
+ if (data.decRef() == 0) {
+ upstreamFeedback.returnData(data);
+ }
}
}
@@ -191,13 +204,21 @@ public abstract class ColumnVectorProduc
* @throws IOException
*/
public ConsumerFeedback<ColumnVectorBatch> read(InputSplit split,
List<Integer> columnIds,
- SearchArgument sarg, Consumer<ColumnVectorBatch> consumer) throws
IOException {
+ SearchArgument sarg, String[] columnNames, Consumer<ColumnVectorBatch>
consumer)
+ throws IOException {
// Create the consumer of encoded data; it will coordinate decoding to
CVBs.
EncodedDataConsumer edc = new EncodedDataConsumer(consumer,
columnIds.size());
// Get the source of encoded data.
EncodedDataProducer<BatchKey> edp = getEncodedDataProducer();
// Then, get the specific reader of encoded data out of the producer.
- EncodedDataReader<BatchKey> reader = edp.getReader(split, columnIds, sarg,
edc);
+ /*
+[ERROR] reason: actual argument
+org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer<BatchKey>.EncodedDataConsumer
+cannot be converted to
+org.apache.hadoop.hive.llap.Consumer<
+ org.apache.hadoop.hive.llap.io.api.EncodedColumn<
+ org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey>> by method invocation
conversion * */
+ EncodedDataReader<BatchKey> reader = edp.getReader(split, columnIds, sarg,
columnNames, edc);
// Set the encoded data reader as upstream feedback for encoded data
consumer, and start.
edc.init(reader);
executor.submit(reader);
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataProducer.java
Mon Feb 2 22:28:15 2015
@@ -22,10 +22,11 @@ import java.util.List;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.InputSplit;
public interface EncodedDataProducer<BatchKey> {
- public EncodedDataReader<BatchKey> getReader(InputSplit split, List<Integer>
columnIds,
- SearchArgument sarg, Consumer<EncodedColumn<BatchKey>> consumer);
+ EncodedDataReader<BatchKey> getReader(InputSplit split, List<Integer>
columnIds,
+ SearchArgument sarg, String[] columnNames,
Consumer<EncodedColumn<BatchKey>> consumer);
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/EncodedDataReader.java
Mon Feb 2 22:28:15 2015
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.llap.io.e
import java.util.concurrent.Callable;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
/**
* Interface for encoded data readers to implement.
@@ -29,5 +29,5 @@ import org.apache.hadoop.hive.llap.io.ap
* The final threading design will probably change.
*/
public interface EncodedDataReader<BatchKey>
- extends ConsumerFeedback<ColumnBuffer>, Callable<Void> {
+ extends ConsumerFeedback<StreamBuffer>, Callable<Void> {
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataProducer.java
Mon Feb 2 22:28:15 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.llap.io.e
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -30,15 +31,18 @@ import org.apache.hadoop.hive.llap.Consu
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.Cache;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
-import org.apache.hadoop.hive.llap.io.api.EncodedColumn.ColumnBuffer;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.llap.io.api.orc.OrcCacheKey;
+import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
+import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
@@ -49,7 +53,7 @@ import org.apache.hadoop.mapred.InputSpl
public class OrcEncodedDataProducer implements
EncodedDataProducer<OrcBatchKey> {
private FileSystem cachedFs = null;
private Configuration conf;
- private OrcMetadataCache metadataCache;
+ private final OrcMetadataCache metadataCache;
// TODO: it makes zero sense to have both at the same time and duplicate
data. Add "cache mode".
private final Cache<OrcCacheKey> cache;
private final LowLevelCache lowLevelCache;
@@ -59,24 +63,23 @@ public class OrcEncodedDataProducer impl
private final FileSplit split;
private List<Integer> columnIds;
private final SearchArgument sarg;
+ private final String[] columnNames;
private final Consumer<EncodedColumn<OrcBatchKey>> consumer;
// Read state.
- private int stripeIxFrom, stripeIxTo;
+ private int stripeIxFrom;
private Reader orcReader;
private final String internedFilePath;
/**
- * readState[stripeIx'][colIx'] - bitmask (as long array) of rg-s that are
done.
- * Bitmasks are all well-known size so we don't bother with BitSets and
such.
- * Each long has natural bit indexes used, so rightmost bits are filled
first.
+ * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of
rg-s that need to be
+ * read. Contains only stripes that are read, and only columns included.
null => read all RGs.
*/
- private long[][][] readState;
- private int[] rgsPerStripe = null;
+ private boolean[][][] readState;
private boolean isStopped = false, isPaused = false;
public OrcEncodedDataReader(InputSplit split, List<Integer> columnIds,
- SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+ SearchArgument sarg, String[] columnNames,
Consumer<EncodedColumn<OrcBatchKey>> consumer) {
this.split = (FileSplit)split;
this.internedFilePath = this.split.getPath().toString().intern();
this.columnIds = columnIds;
@@ -84,6 +87,7 @@ public class OrcEncodedDataProducer impl
Collections.sort(this.columnIds);
}
this.sarg = sarg;
+ this.columnNames = columnNames;
this.consumer = consumer;
}
@@ -109,57 +113,128 @@ public class OrcEncodedDataProducer impl
public Void call() throws IOException {
LlapIoImpl.LOG.info("Processing split for " + internedFilePath);
if (isStopped) return null;
- List<StripeInformation> stripes =
metadataCache.getStripes(internedFilePath);
- List<Type> types = metadataCache.getTypes(internedFilePath);
orcReader = null;
- if (stripes == null || types == null) {
+ // Get FILE metadata from cache, or create the reader and read it.
+ OrcFileMetadata metadata =
metadataCache.getFileMetadata(internedFilePath);
+ if (metadata == null) {
orcReader = createOrcReader(split);
- stripes = metadataCache.getStripes(internedFilePath);
- types = metadataCache.getTypes(internedFilePath);
+ metadata = new OrcFileMetadata(orcReader);
+ metadataCache.putFileMetadata(internedFilePath, metadata);
}
if (columnIds == null) {
- columnIds = new ArrayList<Integer>(types.size());
- for (int i = 1; i < types.size(); ++i) {
+ columnIds = new ArrayList<Integer>(metadata.getTypes().size());
+ for (int i = 1; i < metadata.getTypes().size(); ++i) {
columnIds.add(i);
}
}
- determineWhatToRead(stripes);
+ // Then, determine which stripes to read based on the split.
+ determineStripesToRead(metadata.getStripes());
+ if (readState.length == 0) {
+ consumer.setDone();
+ return null; // No data to read.
+ }
+ int stride = metadata.getRowIndexStride();
+ ArrayList<OrcStripeMetadata> stripesMetadata = null;
+ boolean[] globalIncludes = OrcInputFormat.genIncludedColumns(
+ metadata.getTypes(), columnIds, true);
+ RecordReader[] stripeReaders = new RecordReader[readState.length];
+ if (sarg != null && stride != 0) {
+ // If SARG is present, get relevant stripe metadata from cache or
readers.
+ stripesMetadata = readStripesMetadata(metadata, globalIncludes,
stripeReaders);
+ }
+
+ // Now, apply SARG if any; w/o sarg, this will just initialize readState.
+ determineRgsToRead(metadata.getStripes(), metadata.getTypes(),
+ globalIncludes, stride, stripesMetadata);
if (isStopped) return null;
- List<Integer>[] stripeColsToRead = produceDataFromCache();
- // readState now contains some 1s for column x rgs that were fetched
from cache.
- // TODO: I/O threadpool would be here (or below); for now, linear
- for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod)
{
+ // Get data from high-level cache; if some cols are fully in cache, this
will also
+ // give us the modified list of columns to read for every stripe (null
means all).
+ List<Integer>[] stripeColsToRead =
produceDataFromCache(metadata.getStripes(), stride);
+ // readState has been modified for column x rgs that were fetched from
cache.
+
+ // Then, create the readers for each stripe and prepare to read.
+ for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length;
++stripeIxMod) {
+ List<Integer> colsToRead = stripeColsToRead == null ? null :
stripeColsToRead[stripeIxMod];
+ RecordReader stripeReader = stripeReaders[stripeIxMod];
+ if (colsToRead == null) {
+ colsToRead = columnIds;
+ } else if (colsToRead.isEmpty()) {
+ if (stripeReader != null) {
+ stripeReader.close();
+ stripeReaders[stripeIxMod] = null;
+ }
+ continue; // All the data for this stripe was in cache.
+ } else if (stripeReader != null) {
+ // We have created the reader to read stripe metadata with all
includes.
+ // We will now recreate the reader with narrower included columns
(due to cache).
+ stripeReader.close();
+ stripeReader = null;
+ }
+
+ if (stripeReader != null) continue; // We already have a reader.
+ // Create RecordReader that will be used to read only this stripe.
+ StripeInformation si = metadata.getStripes().get(stripeIxFrom +
stripeIxMod);
+ boolean[] stripeIncludes = OrcInputFormat.genIncludedColumns(
+ metadata.getTypes(), colsToRead, true);
+ if (orcReader == null) {
+ orcReader = createOrcReader(split);
+ }
+ stripeReader = orcReader.rows(si.getOffset(), si.getLength(),
stripeIncludes);
+ stripeReader.prepareEncodedColumnRead();
+ stripeReaders[stripeIxMod] = stripeReader;
+ }
+
+ // We now have one reader per stripe that needs to be read. Read.
+ // TODO: I/O threadpool would be here - one thread per stripe; for now,
linear.
+ OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
+ for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length;
++stripeIxMod) {
+ RecordReader stripeReader = stripeReaders[stripeIxMod];
+ if (stripeReader == null) continue; // No need to read this stripe,
see above.
List<Integer> colsToRead = stripeColsToRead == null ? null :
stripeColsToRead[stripeIxMod];
- long[][] colRgs = readState[stripeIxMod];
if (colsToRead == null) {
colsToRead = columnIds;
}
- if (colsToRead.isEmpty()) continue; // All the data for this stripe
was in cache.
- if (colsToRead.size() != colRgs.length) {
+ boolean[][] colRgs = readState[stripeIxMod];
+ if (colsToRead != null && colsToRead.size() != colRgs.length) {
// We are reading subset of the original columns, remove unnecessary
bitmasks.
- long[][] colRgs2 = new long[colsToRead.size()][];
+ boolean[][] colRgs2 = new boolean[colsToRead.size()][];
for (int i = 0, i2 = -1; i < colRgs.length; ++i) {
if (colRgs[i] == null) continue;
colRgs2[++i2] = colRgs[i];
}
colRgs = colRgs2;
}
- int stripeIx = stripeIxFrom + stripeIxMod;
- StripeInformation si = stripes.get(stripeIx);
- int rgCount = rgsPerStripe[stripeIxMod];
- boolean[] includes = OrcInputFormat.genIncludedColumns(types,
colsToRead, true);
- if (orcReader == null) {
- orcReader = createOrcReader(split);
+
+ // Get stripe metadata. We might have read it earlier for RG filtering.
+ OrcStripeMetadata stripeMetadata;
+ int stripeIx = stripeIxMod + stripeIxFrom;
+ if (stripesMetadata != null) {
+ stripeMetadata = stripesMetadata.get(stripeIxMod);
+ } else {
+ stripeKey.stripeIx = stripeIx;
+ stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
+ if (stripeMetadata == null) {
+ stripeMetadata = new OrcStripeMetadata(stripeReader,
stripeKey.stripeIx);
+ metadataCache.putStripeMetadata(stripeKey, stripeMetadata);
+ stripeKey = new OrcBatchKey(internedFilePath, -1, 0);
+ }
}
- RecordReader stripeReader = orcReader.rows(si.getOffset(),
si.getLength(), includes);
+ stripeReader.setRowIndex(stripeMetadata.getRowIndexes());
+
// In case if we have high-level cache, we will intercept the data and
add it there;
// otherwise just pass the data directly to the consumer.
Consumer<EncodedColumn<OrcBatchKey>> consumer = (cache == null) ?
this.consumer : this;
- stripeReader.readEncodedColumns(colRgs, rgCount, consumer,
lowLevelCache);
+ // This is where I/O happens. This is a sync call that will feed data
to the consumer.
+ try {
+ stripeReader.readEncodedColumns(stripeIx, colRgs, lowLevelCache,
consumer);
+ } catch (Throwable t) {
+ consumer.setError(t);
+ }
stripeReader.close();
}
+ // Done with all the things.
consumer.setDone();
if (DebugUtils.isTraceMttEnabled()) {
LlapIoImpl.LOG.info("done processing " + split);
@@ -167,16 +242,72 @@ public class OrcEncodedDataProducer impl
return null;
}
+ private ArrayList<OrcStripeMetadata> readStripesMetadata(OrcFileMetadata
metadata,
+ boolean[] globalInc, RecordReader[] stripeReaders) throws IOException {
+ ArrayList<OrcStripeMetadata> result = new
ArrayList<OrcStripeMetadata>(stripeReaders.length);
+ OrcBatchKey stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
+ for (int stripeIxMod = 0; stripeIxMod < stripeReaders.length;
++stripeIxMod) {
+ stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
+ OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
+ if (value == null) {
+ // Metadata not present in cache - get it from the reader and put in
cache.
+ if (orcReader == null) {
+ orcReader = createOrcReader(split);
+ }
+ StripeInformation si = metadata.getStripes().get(stripeKey.stripeIx);
+ stripeReaders[stripeIxMod] = orcReader.rows(si.getOffset(),
si.getLength(), globalInc);
+ stripeReaders[stripeIxMod].prepareEncodedColumnRead();
+ value = new OrcStripeMetadata(stripeReaders[stripeIxMod],
stripeKey.stripeIx);
+ metadataCache.putStripeMetadata(stripeKey, value);
+ // Create new key object to reuse for gets; we've used the old one
to put in cache.
+ stripeKey = new OrcBatchKey(internedFilePath, 0, 0);
+ }
+ result.add(value);
+ }
+ return result;
+ }
+
@Override
- public void returnData(ColumnBuffer data) {
+ public void returnData(StreamBuffer data) {
lowLevelCache.releaseBuffers(data.cacheBuffers);
}
- private void determineWhatToRead(List<StripeInformation> stripes) {
- // The unit of caching for ORC is (stripe x column) (see OrcBatchKey).
+ private void determineRgsToRead(List<StripeInformation> stripes,
List<Type> types,
+ boolean[] globalIncludes, int rowIndexStride,
ArrayList<OrcStripeMetadata> metadata)
+ throws IOException {
+ SargApplier sargApp = null;
+ if (sarg != null) {
+ String[] colNamesForSarg = OrcInputFormat.getSargColumnNames(
+ columnNames, types, globalIncludes,
OrcInputFormat.isOriginal(orcReader));
+ sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride);
+ }
+ // readState should have been initialized by this time with an empty
array.
+ for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod)
{
+ int originalStripeIx = stripeIxMod + stripeIxFrom;
+ StripeInformation stripe = stripes.get(originalStripeIx);
+ int rgCount = getRgCount(stripe, rowIndexStride);
+ boolean[] rgsToRead = null;
+ if (sargApp != null) {
+ rgsToRead = sargApp.pickRowGroups(stripe,
metadata.get(stripeIxMod).getRowIndexes());
+ }
+ assert rgsToRead == null || rgsToRead.length == rgCount;
+ readState[stripeIxMod] = new boolean[columnIds.size()][];
+ for (int j = 0; j < columnIds.size(); ++j) {
+ readState[stripeIxMod][j] = (rgsToRead == null) ? null :
+ Arrays.copyOf(rgsToRead, rgsToRead.length);
+ }
+ }
+ }
+
+ private int getRgCount(StripeInformation stripe, int rowIndexStride) {
+ return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
+ }
+
+ public void determineStripesToRead(List<StripeInformation> stripes) {
+ // The unit of caching for ORC is (rg x column) (see OrcBatchKey).
long offset = split.getStart(), maxOffset = offset + split.getLength();
- stripeIxFrom = stripeIxTo = -1;
- int stripeIx = 0;
+ stripeIxFrom = -1;
+ int stripeIxTo = -1;
if (LlapIoImpl.LOG.isDebugEnabled()) {
String tmp = "FileSplit {" + split.getStart() + ", " +
split.getLength() + "}; stripes ";
for (StripeInformation stripe : stripes) {
@@ -185,7 +316,7 @@ public class OrcEncodedDataProducer impl
LlapIoImpl.LOG.debug(tmp);
}
- List<Integer> stripeRgCounts = new ArrayList<Integer>(stripes.size());
+ int stripeIx = 0;
for (StripeInformation stripe : stripes) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) continue;
@@ -204,9 +335,6 @@ public class OrcEncodedDataProducer impl
stripeIxTo = stripeIx;
break;
}
- int rgCount = (int)Math.ceil(
- (double)stripe.getNumberOfRows() / orcReader.getRowIndexStride());
- stripeRgCounts.add(rgCount);
++stripeIx;
}
if (stripeIxTo == -1) {
@@ -215,54 +343,51 @@ public class OrcEncodedDataProducer impl
}
stripeIxTo = stripeIx;
}
- readState = new long[stripeRgCounts.size()][][];
- for (int i = 0; i < stripeRgCounts.size(); ++i) {
- int bitmaskSize = align64(stripeRgCounts.get(i)) >>> 6;
- readState[i] = new long[columnIds.size()][];
- for (int j = 0; j < columnIds.size(); ++j) {
- readState[i][j] = new long[bitmaskSize];
- }
- }
- // TODO: HERE, we need to apply sargs and mark RGs that are filtered as
1s
- rgsPerStripe = new int[stripeRgCounts.size()];
- for (int i = 0; i < rgsPerStripe.length; ++i) {
- rgsPerStripe[i] = stripeRgCounts.get(i);
- }
+ readState = new boolean[stripeIxTo - stripeIxFrom][][];
}
// TODO: split by stripe? we do everything by stripe, and it might be
faster
- private List<Integer>[] produceDataFromCache() {
+ private List<Integer>[] produceDataFromCache(
+ List<StripeInformation> stripes, int rowIndexStride) {
if (cache == null) return null;
OrcCacheKey key = new OrcCacheKey(internedFilePath, -1, -1, -1);
+ // For each stripe, keep a list of columns that are not fully in cache
(null => all of them).
@SuppressWarnings("unchecked") // No generics arrays - "J" in "Java"
stands for "joke".
List<Integer>[] stripeColsNotInCache = new List[readState.length];
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod)
{
key.stripeIx = stripeIxFrom + stripeIxMod;
- long[][] cols = readState[stripeIxMod];
- int rgCount = rgsPerStripe[stripeIxMod];
+ boolean[][] cols = readState[stripeIxMod];
+ // TODO## at self-CR, see that colIx business here was not screwed up
for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
+ boolean[] readMask = cols[colIxMod];
key.colIx = columnIds.get(colIxMod);
- long[] doneMask = cols[colIxMod];
+ // Assume first all RGs will be in cache; calculate or get the RG
count.
boolean areAllRgsInCache = true;
+ int rgCount = readMask != null ? readMask.length
+ : getRgCount(stripes.get(key.stripeIx), rowIndexStride);
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
- int maskIndex = rgIx >>> 6, maskBit = 1 << (rgIx & 63);
- if ((doneMask[maskIndex] & maskBit) != 0) continue; // RG
eliminated by SARG
+ if (readMask != null && !readMask[rgIx]) continue; // RG
eliminated by SARG
key.rgIx = rgIx;
- ColumnBuffer cached = cache.get(key);
+ StreamBuffer cached = cache.get(key);
if (cached == null) {
areAllRgsInCache = false;
continue;
}
+ // RG was in cache; send it over to the consumer.
// TODO: pool of EncodedColumn-s objects. Someone will need to
return them though.
- EncodedColumn<OrcBatchKey> col = new EncodedColumn<OrcBatchKey>(
- key.copyToPureBatchKey(), key.colIx, cached);
+ EncodedColumn<OrcBatchKey> col = null;
+ // TODO# new EncodedColumn<OrcBatchKey>(key.copyToPureBatchKey(),
key.colIx, cached);
consumer.consumeData(col);
- doneMask[maskIndex] = doneMask[maskIndex] | maskBit;
+ if (readMask == null) {
+ // We were going to read all RGs, but now that some were in
cache, allocate the mask.
+ cols[colIxMod] = readMask = new boolean[rgCount];
+ Arrays.fill(readMask, true);
+ }
+ readMask[rgIx] = false; // Got from cache, don't read from disk.
}
- boolean hasFetchList = stripeColsNotInCache[stripeIxMod] != null;
+ boolean hasExplicitColList = stripeColsNotInCache[stripeIxMod] !=
null;
if (areAllRgsInCache) {
- cols[colIxMod] = null; // No need for bitmask, all rgs are done.
- if (!hasFetchList) {
+ if (!hasExplicitColList) {
// All rgs for this stripe x column were fetched from cache. If
this is the first
// such column, create custom, smaller list of columns to fetch
later for this
// stripe (default is all the columns originally requested). Add
all previous
@@ -272,7 +397,7 @@ public class OrcEncodedDataProducer impl
stripeColsNotInCache[stripeIxMod].addAll(columnIds.subList(0,
colIxMod));
}
}
- } else if (hasFetchList) {
+ } else if (hasExplicitColList) {
// Only a subset of original columnIds need to be fetched for this
stripe;
// add the current one to this sublist.
stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
@@ -292,11 +417,14 @@ public class OrcEncodedDataProducer impl
// Store object in cache; create new key object - cannot be reused.
assert cache != null;
OrcCacheKey key = new OrcCacheKey(data.batchKey, data.columnIndex);
- ColumnBuffer cached = cache.cacheOrGet(key, data.columnData);
- if (data.columnData != cached) {
+ // TODO#: change type of cache and restore this
+ /*
+ StreamBuffer cached = cache.cacheOrGet(key, data.columnData);
+ if (data.streamData != cached) {
lowLevelCache.releaseBuffers(data.columnData.cacheBuffers);
data.columnData = cached;
}
+ */
consumer.consumeData(data);
}
@@ -312,16 +440,9 @@ public class OrcEncodedDataProducer impl
if ("pfile".equals(path.toUri().getScheme())) {
fs = path.getFileSystem(conf); // Cannot use cached FS due to hive
tests' proxy FS.
}
- if (metadataCache == null) {
- metadataCache = new OrcMetadataCache(cachedFs, path, conf);
- }
return OrcFile.createReader(path,
OrcFile.readerOptions(conf).filesystem(fs));
}
- private static int align64(int number) {
- return ((number + 63) & ~63);
- }
-
public OrcEncodedDataProducer(LowLevelCache lowLevelCache,
Cache<OrcCacheKey> cache,
Configuration conf) throws IOException {
// We assume all splits will come from the same FS.
@@ -329,12 +450,12 @@ public class OrcEncodedDataProducer impl
this.cache = cache;
this.lowLevelCache = lowLevelCache;
this.conf = conf;
- this.metadataCache = null;
+ this.metadataCache = new OrcMetadataCache();
}
@Override
public EncodedDataReader<OrcBatchKey> getReader(InputSplit split,
List<Integer> columnIds,
- SearchArgument sarg, Consumer<EncodedColumn<OrcBatchKey>> consumer) {
- return new OrcEncodedDataReader(split, columnIds, sarg, consumer);
+ SearchArgument sarg, String[] columnNames,
Consumer<EncodedColumn<OrcBatchKey>> consumer) {
+ return new OrcEncodedDataReader(split, columnIds, sarg, columnNames,
consumer);
}
}
Added:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java?rev=1656595&view=auto
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
(added)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcFileMetadata.java
Mon Feb 2 22:28:15 2015
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+
+public class OrcFileMetadata {
+ private CompressionKind compressionKind;
+ private int compressionBufferSize;
+ private List<OrcProto.Type> types;
+ private List<StripeInformation> stripes;
+ private int rowIndexStride;
+
+ public OrcFileMetadata(Reader reader) {
+ setCompressionKind(reader.getCompression());
+ setCompressionBufferSize(reader.getCompressionSize());
+ setStripes(reader.getStripes());
+ setTypes(reader.getTypes());
+ setRowIndexStride(reader.getRowIndexStride());
+ }
+
+ public List<StripeInformation> getStripes() {
+ return stripes;
+ }
+
+ public void setStripes(List<StripeInformation> stripes) {
+ this.stripes = stripes;
+ }
+
+ public CompressionKind getCompressionKind() {
+ return compressionKind;
+ }
+
+ public void setCompressionKind(CompressionKind compressionKind) {
+ this.compressionKind = compressionKind;
+ }
+
+ public int getCompressionBufferSize() {
+ return compressionBufferSize;
+ }
+
+ public void setCompressionBufferSize(int compressionBufferSize) {
+ this.compressionBufferSize = compressionBufferSize;
+ }
+
+ public List<OrcProto.Type> getTypes() {
+ return types;
+ }
+
+ public void setTypes(List<OrcProto.Type> types) {
+ this.types = types;
+ }
+
+ public int getRowIndexStride() {
+ return rowIndexStride;
+ }
+
+ public void setRowIndexStride(int rowIndexStride) {
+ this.rowIndexStride = rowIndexStride;
+ }
+}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataCache.java
Mon Feb 2 22:28:15 2015
@@ -19,79 +19,48 @@
package org.apache.hadoop.hive.llap.io.metadata;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.ExecutionException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
+import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* ORC-specific metadata cache.
+ * TODO: should be merged with main cache somehow if we find this takes too
much memory
*/
public class OrcMetadataCache {
private static final int DEFAULT_CACHE_ACCESS_CONCURRENCY = 10;
- private static final int DEFAULT_MAX_CACHE_ENTRIES = 100;
- private static Cache<String, OrcMetadata> METADATA;
+ private static final int DEFAULT_MAX_FILE_ENTRIES = 1000;
+ private static final int DEFAULT_MAX_STRIPE_ENTRIES = 10000;
+ private static Cache<String, OrcFileMetadata> METADATA;
+ private static Cache<OrcBatchKey, OrcStripeMetadata> STRIPE_METADATA;
static {
METADATA = CacheBuilder.newBuilder()
.concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY)
- .maximumSize(DEFAULT_MAX_CACHE_ENTRIES)
+ .maximumSize(DEFAULT_MAX_FILE_ENTRIES)
+ .build();
+ STRIPE_METADATA = CacheBuilder.newBuilder()
+ .concurrencyLevel(DEFAULT_CACHE_ACCESS_CONCURRENCY)
+ .maximumSize(DEFAULT_MAX_STRIPE_ENTRIES)
.build();
- }
-
- private Path path;
- private OrcMetadataLoader loader;
-
- public OrcMetadataCache(FileSystem fs, Path path, Configuration conf) {
- this.path = path;
- this.loader = new OrcMetadataLoader(fs, path, conf);
- }
-
- public CompressionKind getCompression(String pathString) throws IOException {
- try {
- return METADATA.get(pathString, loader).getCompressionKind();
- } catch (ExecutionException e) {
- throw new IOException("Unable to load orc metadata for " +
path.toString(), e);
}
- }
- public int getCompressionBufferSize(String pathString) throws IOException {
- try {
- return METADATA.get(pathString, loader).getCompressionBufferSize();
- } catch (ExecutionException e) {
- throw new IOException("Unable to load orc metadata for " +
path.toString(), e);
- }
+ public void putFileMetadata(String filePath, OrcFileMetadata metaData) {
+ METADATA.put(filePath, metaData);
}
- public List<OrcProto.Type> getTypes(String pathString) throws IOException {
- try {
- return METADATA.get(pathString, loader).getTypes();
- } catch (ExecutionException e) {
- throw new IOException("Unable to load orc metadata for " +
path.toString(), e);
- }
+ public void putStripeMetadata(OrcBatchKey stripeKey, OrcStripeMetadata
metaData) {
+ STRIPE_METADATA.put(stripeKey, metaData);
}
- public List<StripeInformation> getStripes(String pathString) throws
IOException {
- try {
- return METADATA.get(pathString, loader).getStripes();
- } catch (ExecutionException e) {
- throw new IOException("Unable to load orc metadata for " +
path.toString(), e);
- }
+ public OrcStripeMetadata getStripeMetadata(OrcBatchKey stripeKey) throws
IOException {
+ return STRIPE_METADATA.getIfPresent(stripeKey);
}
- // public boolean[] getIncludedRowGroups(String pathString, SearchArgument
sarg, int stripeIdx) throws IOException {
- // try {
- // return METADATA.get(pathString, loader).getStripeToRowIndexEntries();
- // } catch (ExecutionException e) {
- // throw new IOException("Unable to load orc metadata for " +
path.toString(), e);
- // }
- // }
+ public OrcFileMetadata getFileMetadata(String pathString) throws IOException
{
+ return METADATA.getIfPresent(pathString);
+ }
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcMetadataLoader.java
Mon Feb 2 22:28:15 2015
@@ -18,50 +18,21 @@
package org.apache.hadoop.hive.llap.io.metadata;
-import static org.apache.hadoop.hive.ql.io.orc.OrcFile.readerOptions;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.llap.io.orc.OrcFile;
import org.apache.hadoop.hive.llap.io.orc.Reader;
-import org.apache.hadoop.hive.llap.io.orc.RecordReader;
-import org.apache.hadoop.hive.ql.io.orc.OrcProto;
-import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
-
-public class OrcMetadataLoader implements Callable<OrcMetadata> {
- private FileSystem fs;
- private Path path;
- private Configuration conf;
-
- public OrcMetadataLoader(FileSystem fs, Path path, Configuration conf) {
- this.fs = fs;
- this.path = path;
- this.conf = conf;
+
+// TODO: this class is pointless
+public class OrcMetadataLoader implements Callable<OrcFileMetadata> {
+ private Reader reader;
+
+ public OrcMetadataLoader(Reader reader) {
+ this.reader = reader;
}
@Override
- public OrcMetadata call() throws Exception {
- Reader reader = OrcFile.createLLAPReader(path,
readerOptions(conf).filesystem(fs));
- OrcMetadata orcMetadata = new OrcMetadata();
- orcMetadata.setCompressionKind(reader.getCompression());
- orcMetadata.setCompressionBufferSize(reader.getCompressionSize());
- List<StripeInformation> stripes = reader.getStripes();
- orcMetadata.setStripes(stripes);
- Map<Integer, List<OrcProto.ColumnEncoding>> stripeColEnc = new
HashMap<Integer, List<OrcProto.ColumnEncoding>>();
- Map<Integer, OrcProto.RowIndex[]> stripeRowIndices = new HashMap<Integer,
OrcProto.RowIndex[]>();
- RecordReader rows = reader.rows();
- for (int i = 0; i < stripes.size(); i++) {
- stripeColEnc.put(i, rows.getColumnEncodings(i));
- stripeRowIndices.put(i, rows.getRowIndexEntries(i));
- }
- orcMetadata.setStripeToColEncodings(stripeColEnc);
- orcMetadata.setStripeToRowIndexEntries(stripeRowIndices);
- return orcMetadata;
+ public OrcFileMetadata call() throws Exception {
+ return new OrcFileMetadata(reader);
}
}
Added:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java?rev=1656595&view=auto
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
(added)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java
Mon Feb 2 22:28:15 2015
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+
+public class OrcStripeMetadata {
+ // TODO#: add encoding and stream list
+ OrcProto.RowIndex[] rowIndexes;
+
+ public OrcStripeMetadata(RecordReader reader, int stripeIx) throws
IOException {
+ rowIndexes = reader.getCurrentRowIndexEntries();
+ }
+
+ public OrcProto.RowIndex[] getRowIndexes() {
+ return rowIndexes;
+ }
+
+ public void setRowIndexes(OrcProto.RowIndex[] rowIndexes) {
+ this.rowIndexes = rowIndexes;
+ }
+}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/LLAPRecordReaderImpl.java
Mon Feb 2 22:28:15 2015
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.io.ap
import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.*;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -45,24 +47,6 @@ public class LLAPRecordReaderImpl extend
}
@Override
- public OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws
IOException {
- return readRowIndex(stripeIdx);
- }
-
- @Override
- public List<OrcProto.ColumnEncoding> getColumnEncodings(int stripeIdx)
throws IOException {
- StripeInformation si = stripes.get(stripeIdx);
- OrcProto.StripeFooter sf = readStripeFooter(si);
- return sf.getColumnsList();
- }
-
- @Override
- public boolean[] getIncludedRowGroups(int stripeIdx) throws IOException {
- currentStripe = stripeIdx;
- return pickRowGroups();
- }
-
- @Override
public boolean hasNext() throws IOException {
return false;
}
@@ -96,10 +80,4 @@ public class LLAPRecordReaderImpl extend
public void seekToRow(long rowCount) throws IOException {
}
-
- @Override
- public void readEncodedColumns(long[][] colRgs, int rgCount,
- Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache) {
- throw new UnsupportedOperationException("not implemented");
- }
}
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/orc/RecordReader.java
Mon Feb 2 22:28:15 2015
@@ -26,27 +26,4 @@ import org.apache.hadoop.hive.ql.io.orc.
*
*/
public interface RecordReader extends
org.apache.hadoop.hive.ql.io.orc.RecordReader {
- /**
- * Return all row index entries for the specified stripe index.
- *
- * @param stripeIdx - stripe index within orc file
- * @return - all row index entries
- */
- OrcProto.RowIndex[] getRowIndexEntries(int stripeIdx) throws IOException;
-
- /**
- * Return column encodings of all columns for the specified stripe index.
- *
- * @param stripeIdx - stripe index within orc file
- * @return - column encodings of all columns
- */
- List<OrcProto.ColumnEncoding> getColumnEncodings(int stripeIdx) throws
IOException;
-
- /**
- * Return the row groups that satisfy the SARG condition for the specified
stripe index.
- *
- * @param stripeIdx - stripe index within orc file
- * @return - row groups qualifying the SARG
- */
- boolean[] getIncludedRowGroups(int stripeIdx) throws IOException;
}
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
Mon Feb 2 22:28:15 2015
@@ -21,17 +21,21 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.DiskRange;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumn.StreamBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
import com.google.common.annotations.VisibleForTesting;
+import com.sun.tools.javac.code.Attribute.Array;
abstract class InStream extends InputStream {
private static final Log LOG = LogFactory.getLog(InStream.class);
@@ -125,6 +129,15 @@ abstract class InStream extends InputStr
}
}
+ private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
+ // TODO: use the same pool as the ORC readers
+ if (isDirect) {
+ return ByteBuffer.allocateDirect(size);
+ } else {
+ return ByteBuffer.allocate(size);
+ }
+ }
+
private static class CompressedStream extends InStream {
private final String fileName;
private final String name;
@@ -154,15 +167,6 @@ abstract class InStream extends InputStr
this.cache = cache;
}
- private ByteBuffer allocateBuffer(int size, boolean isDirect) {
- // TODO: use the same pool as the ORC readers
- if (isDirect) {
- return ByteBuffer.allocateDirect(size);
- } else {
- return ByteBuffer.allocate(size);
- }
- }
-
// TODO: This should not be used for main path.
private final LlapMemoryBuffer[] singleAllocDest = new LlapMemoryBuffer[1];
private void allocateForUncompressed(int size, boolean isDirect) {
@@ -295,7 +299,7 @@ abstract class InStream extends InputStr
}
/* slices a read only contiguous buffer of chunkLength */
- private ByteBuffer slice(int chunkLength) throws IOException {
+ private ByteBuffer slice(int chunkLength) throws IOException {
int len = chunkLength;
final long oldOffset = currentOffset;
ByteBuffer slice;
@@ -478,4 +482,204 @@ abstract class InStream extends InputStr
return new CompressedStream(fileName, name, input, length, codec,
bufferSize, cache);
}
}
+
+ private static class ProcCacheChunk extends CacheChunk {
+ public ProcCacheChunk(long cbStartOffset, long cbEndOffset,
+ boolean isCompressed, ByteBuffer originalData, LlapMemoryBuffer
targetBuffer) {
+ super(targetBuffer, cbStartOffset, cbEndOffset);
+ this.isCompressed = isCompressed;
+ this.originalData = originalData;
+ }
+
+ boolean isCompressed;
+ ByteBuffer originalData = null;
+ }
+
+ /**
+ * Uncompresses part of the stream. RGs can overlap, so we cannot just go
and decompress
+ * and remove what we have returned. We will keep iterator as a "hint" point.
+ * TODO: Java LinkedList and iter have a really stupid interface. Replace
with own simple one?
+ */
+ public static void uncompressStream(String fileName,
+ ListIterator<DiskRange> ranges,
+ CompressionCodec codec, int bufferSize, LowLevelCache cache,
+ long cOffset, long endCOffset, StreamBuffer colBuffer)
+ throws IOException {
+ // TODO#: accpount for coffsets being -1 after finishing the normal
methods.
+ colBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
+ List<ProcCacheChunk> toDecompress = new ArrayList<ProcCacheChunk>();
+
+ // Find our bearings in the stream. Normally, iter will already point
either to where we
+ // want to be, or just before. However, RGs can overlap due to encoding,
so we may have
+ // to return to a previous block.
+ DiskRange current = findCompressedPosition(ranges, cOffset);
+
+ // Go thru the blocks; add stuff to results and prepare the decompression
work (see below).
+ int nextCbOffset = (cOffset >= 0) ? (int)(cOffset - current.offset) : -1;
+ long currentCOffset = cOffset;
+ while (true) {
+ if (current instanceof CacheChunk) {
+ // This is a cached compression buffer, add as is.
+ if (nextCbOffset > 0) throw new AssertionError("Compressed offset in
the middle of cb");
+ CacheChunk cc = (CacheChunk)current;
+ colBuffer.cacheBuffers.add(cc.buffer);
+ currentCOffset = cc.end;
+ } else {
+ // This is a compressed buffer. We need to uncompress it; the buffer
can comprise
+ // several disk ranges, so we might need to combine them.
+ BufferChunk bc = (BufferChunk)current;
+ // TODO#: DOUBLE check the iterator state.
+ int chunkLength = addOneCompressionBuffer(bc, ranges, bufferSize,
+ cache, colBuffer.cacheBuffers, toDecompress, nextCbOffset);
+ currentCOffset = bc.offset + chunkLength;
+ }
+ nextCbOffset = -1;
+ if ((endCOffset >= 0 && currentCOffset >= endCOffset) ||
!ranges.hasNext()) {
+ break;
+ }
+ current = ranges.next();
+ }
+
+ // At this point, we have read all the CBs we need to read. cacheBuffers
contains some cache
+ // data and some unallocated membufs for decompression. toDecompress
contains all the work we
+ // need to do, and each item points to one of the membufs in cacheBuffers
as target. The iter
+ // has also been adjusted to point to these buffers instead of compressed
data for the ranges.
+ // Allocate the buffers, prepare cache kets.
+ LlapMemoryBuffer[] targetBuffers = new
LlapMemoryBuffer[toDecompress.size()];
+ DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
+ int ix = 0;
+ for (ProcCacheChunk chunk : toDecompress) {
+ cacheKeys[ix] = chunk; // Relies on the fact that cache does not
actually store DiskRange.
+ targetBuffers[ix] = chunk.buffer;
+ ++ix;
+ }
+ cache.allocateMultiple(targetBuffers, bufferSize);
+
+ // Now decompress (or copy) the data into cache buffers.
+ for (ProcCacheChunk chunk : toDecompress) {
+ if (chunk.isCompressed) {
+ codec.decompress(chunk.originalData, chunk.buffer.byteBuffer);
+ } else {
+ chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed
data to cache.
+ }
+ chunk.originalData = null; // TODO#: are we supposed to release this to
zcr in some cases
+ }
+
+ // Finally, put data to cache.
+ cache.putFileData(fileName, cacheKeys, targetBuffers);
+ }
+
+ private static DiskRange findCompressedPosition(
+ ListIterator<DiskRange> ranges, long cOffset) {
+ if (cOffset < 0) return ranges.next();
+ DiskRange current = null;
+ boolean doCallNext = false;
+ if (ranges.hasNext()) {
+ current = ranges.next();
+ } else if (ranges.hasPrevious()) {
+ current = ranges.previous();
+ doCallNext = true;
+ }
+ // We expect the offset to be valid TODO: rather, validate
+ while (current.end <= cOffset) {
+ current = ranges.next();
+ doCallNext = false;
+ }
+ while (current.offset > cOffset) {
+ current = ranges.previous();
+ doCallNext = true;
+ }
+ if (doCallNext) {
+ // TODO: WTF?
+ ranges.next(); // We called previous, make sure next is the real next
and not current.
+ }
+ return current;
+ }
+
+
+ private static int addOneCompressionBuffer(BufferChunk current,
+ ListIterator<DiskRange> ranges, int bufferSize,
+ LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
+ List<ProcCacheChunk> toDecompress, int nextCbOffsetExpected) throws
IOException {
+ // TODO#: HERE
+ ByteBuffer slice = null;
+ ByteBuffer compressed = current.chunk;
+ if (nextCbOffsetExpected >= 0 && nextCbOffsetExpected !=
compressed.position()) {
+ throw new AssertionError("We don't know what we are doing anymore");
+ }
+ long cbStartOffset = current.offset + compressed.position();
+ int b0 = compressed.get() & 0xff;
+ int b1 = compressed.get() & 0xff;
+ int b2 = compressed.get() & 0xff;
+ int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+ if (chunkLength > bufferSize) {
+ throw new IllegalArgumentException("Buffer size too small. size = " +
+ bufferSize + " needed = " + chunkLength);
+ }
+ boolean isUncompressed = ((b0 & 0x01) == 1);
+ if (compressed.remaining() >= chunkLength) {
+ // Simple case - CB fits entirely in the disk range.
+ slice = compressed.slice();
+ slice.limit(chunkLength);
+ addOneCompressionBlockByteBuffer(slice, isUncompressed, ranges, cache,
compressed,
+ cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+ return chunkLength;
+ }
+
+ // TODO: we could remove extra copy for isUncompressed case.
+ // We need to consolidate 2 or more buffers into one to decompress.
+ ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
+ int remaining = chunkLength - compressed.remaining();
+ copy.put(compressed);
+ ranges.remove();
+
+ while (ranges.hasNext()) {
+ DiskRange range = ranges.next();
+ if (!(range instanceof BufferChunk)) {
+ throw new IOException("Trying to extend compressed block into
uncompressed block");
+ }
+ compressed = range.getData();
+ if (compressed.remaining() >= remaining) {
+ slice = compressed.slice();
+ slice.limit(remaining);
+ copy.put(slice);
+ addOneCompressionBlockByteBuffer(copy, isUncompressed, ranges, cache,
compressed,
+ cbStartOffset, chunkLength, toDecompress, cacheBuffers);
+ return chunkLength;
+ }
+ remaining -= compressed.remaining();
+ copy.put(compressed);
+ ranges.remove();
+ }
+ throw new IOException("EOF in while trying to read "
+ + chunkLength + " bytes at " + cbStartOffset);
+ }
+
+ private static void addOneCompressionBlockByteBuffer(
+ ByteBuffer data, boolean isUncompressed,
+ ListIterator<DiskRange> ranges, LowLevelCache cache,
+ ByteBuffer compressed, long cbStartOffset, int chunkLength,
+ List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
+ // Prepare future cache buffer.
+ LlapMemoryBuffer futureAlloc = cache.createUnallocated();
+ // Add it to result in order we are processing.
+ cacheBuffers.add(futureAlloc);
+ // Add it to the list of work to decompress.
+ long cbEndOffset = cbStartOffset + chunkLength;
+ ProcCacheChunk cc = new ProcCacheChunk(
+ cbStartOffset, cbEndOffset, !isUncompressed, data, futureAlloc);
+ toDecompress.add(cc);
+ // Adjust the compression block position.
+ compressed.position(compressed.position() + chunkLength);
+ // Finally, put it in the ranges list for future use (if shared between
RGs).
+ // Before anyone else accesses it, it would have been allocated and
decompressed locally.
+ if (compressed.remaining() <= 0) {
+ ranges.set(cc);
+ } else {
+ ranges.previous();
+ ranges.add(cc);
+ ranges.next(); // TODO: This is really stupid.
+ }
+ }
+
}
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
Mon Feb 2 22:28:15 2015
@@ -219,14 +219,17 @@ public class OrcInputFormat implements
long offset, long length
) throws IOException {
Reader.Options options = new Reader.Options().range(offset, length);
- boolean isOriginal =
- !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+ boolean isOriginal = isOriginal(file);
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
setSearchArgument(options, types, conf, isOriginal);
return file.rowsOptions(options);
}
+ public static boolean isOriginal(Reader file) {
+ return !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+ }
+
/**
* Recurse down into a type subtree turning on all of the sub-columns.
* @param types the types of the file
@@ -278,6 +281,21 @@ public class OrcInputFormat implements
}
}
+ public static String[] getSargColumnNames(String[] originalColumnNames,
+ List<OrcProto.Type> types, boolean[] includedColumns, boolean
isOriginal) {
+ int rootColumn = getRootColumn(isOriginal);
+ String[] columnNames = new String[types.size() - rootColumn];
+ int i = 0;
+ for(int columnId: types.get(rootColumn).getSubtypesList()) {
+ if (includedColumns == null || includedColumns[columnId - rootColumn]) {
+ // this is guaranteed to be positive because types only have children
+ // ids greater than their own id.
+ columnNames[columnId - rootColumn] = originalColumnNames[i++];
+ }
+ }
+ return columnNames;
+ }
+
static void setSearchArgument(Reader.Options options,
List<OrcProto.Type> types,
Configuration conf,
@@ -296,19 +314,8 @@ public class OrcInputFormat implements
}
LOG.info("ORC pushdown predicate: " + sarg);
- int rootColumn = getRootColumn(isOriginal);
- String[] neededColumnNames = columnNamesString.split(",");
- String[] columnNames = new String[types.size() - rootColumn];
- boolean[] includedColumns = options.getInclude();
- int i = 0;
- for(int columnId: types.get(rootColumn).getSubtypesList()) {
- if (includedColumns == null || includedColumns[columnId - rootColumn]) {
- // this is guaranteed to be positive because types only have children
- // ids greater than their own id.
- columnNames[columnId - rootColumn] = neededColumnNames[i++];
- }
- }
- options.searchArgument(sarg, columnNames);
+ options.searchArgument(sarg, getSargColumnNames(
+ columnNamesString.split(","), types, options.getInclude(),
isOriginal));
}
@Override
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1656595&r1=1656594&r2=1656595&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
Mon Feb 2 22:28:15 2015
@@ -18,8 +18,11 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.ColumnEncoding;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.io.api.EncodedColumn;
@@ -82,6 +85,9 @@ public interface RecordReader {
*/
void seekToRow(long rowCount) throws IOException;
+ void prepareEncodedColumnRead() throws IOException;
+
+ // TODO: maybe all of this should be moved to LLAP-specific class
/**
* TODO: this API is subject to change; on one hand, external code should
control the threading
* aspects, with ORC method returning one EncodedColumn as it will; on
the other, it's
@@ -89,13 +95,18 @@ public interface RecordReader {
* return many EncodedColumn-s.
* TODO: assumes the reader is for one stripe, otherwise the signature
makes no sense.
* Also has no columns passed, because that is in ctor.
- * @param colRgs Bitmasks of what RGs are to be read. Has # of elements
equal to the number of
- * included columns; then each bitmask is rgCount bits long; 0
means "need to read"
- * @param rgCount The length of bitmasks in colRgs.
- * @param sarg Sarg to apply additional filtering to RGs.
+ * @param colRgs What RGs are to be read. Has # of elements equal to the
number of
+ * included columns; then each boolean is rgCount long.
+ * @param cache Cache to get/put data and allocate memory.
* @param consumer Consumer to pass the results too.
- * @param allocator Allocator to allocate memory.
+ * @throws IOException
*/
- void readEncodedColumns(long[][] colRgs, int rgCount,
- Consumer<EncodedColumn<OrcBatchKey>> consumer, LowLevelCache cache);
+ void readEncodedColumns(int stripeIx, boolean[][] colRgs,
+ LowLevelCache cache, Consumer<EncodedColumn<OrcBatchKey>> consumer)
throws IOException;
+
+ RowIndex[] getCurrentRowIndexEntries() throws IOException;
+
+ List<ColumnEncoding> getCurrentColumnEncodings() throws IOException;
+
+ void setRowIndex(RowIndex[] rowIndex);
}