Repository: hive Updated Branches: refs/heads/master 694372770 -> 170637386
HIVE-15799 : LLAP: rename VertorDeserializeOrcWriter (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/17063738 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/17063738 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/17063738 Branch: refs/heads/master Commit: 170637386d0cf2a1c1849dc9eb2af594a6275b50 Parents: 6943727 Author: Sergey Shelukhin <[email protected]> Authored: Mon Feb 13 19:09:13 2017 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Feb 13 19:09:13 2017 -0800 ---------------------------------------------------------------------- .../llap/io/encoded/SerDeEncodedDataReader.java | 14 +- .../io/encoded/VectorDeserializeOrcWriter.java | 458 +++++++++++++++++++ .../io/encoded/VertorDeserializeOrcWriter.java | 458 ------------------- 3 files changed, 465 insertions(+), 465 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index f6531e8..221c99e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.decode.GenericColumnVectorProducer.SerDeStripeMetadata; import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter; -import org.apache.hadoop.hive.llap.io.encoded.VertorDeserializeOrcWriter.AsyncCallback; +import org.apache.hadoop.hive.llap.io.encoded.VectorDeserializeOrcWriter.AsyncCallback; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HdfsUtils; @@ -176,7 +176,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> * the consumer, at which point the consumer is responsible for it. */ private FileData cachedData; - private List<VertorDeserializeOrcWriter> asyncWriters = new ArrayList<>(); + private List<VectorDeserializeOrcWriter> asyncWriters = new ArrayList<>(); public SerDeEncodedDataReader(SerDeLowLevelCacheImpl cache, BufferUsageManager bufferManager, Configuration daemonConf, FileSplit split, @@ -1375,15 +1375,15 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> List<Integer> splitColumnIds = OrcInputFormat.genIncludedColumnsReverse( schema, splitIncludes, false); // fileread writes to the writer, which writes to orcWriter, which writes to cacheWriter - EncodingWriter writer = VertorDeserializeOrcWriter.create( + EncodingWriter writer = VectorDeserializeOrcWriter.create( sourceInputFormat, sourceSerDe, parts, daemonConf, jobConf, split.getPath(), originalOi, splitColumnIds, splitIncludes, allocSize); // TODO: move this into ctor? EW would need to create CacheWriter then List<Integer> cwColIds = writer.isOnlyWritingIncludedColumns() ? splitColumnIds : columnIds; writer.init(new CacheWriter(bufferManager, cwColIds, splitIncludes, writer.isOnlyWritingIncludedColumns()), daemonConf, split.getPath()); - if (writer instanceof VertorDeserializeOrcWriter) { - VertorDeserializeOrcWriter asyncWriter = (VertorDeserializeOrcWriter)writer; + if (writer instanceof VectorDeserializeOrcWriter) { + VectorDeserializeOrcWriter asyncWriter = (VectorDeserializeOrcWriter)writer; asyncWriter.startAsync(new AsyncCacheDataCallback()); this.asyncWriters.add(asyncWriter); } @@ -1403,7 +1403,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private class AsyncCacheDataCallback implements AsyncCallback { @Override - public void onComplete(VertorDeserializeOrcWriter writer) { + public void onComplete(VectorDeserializeOrcWriter writer) { CacheWriter cacheWriter = null; try { cacheWriter = writer.getCacheWriter(); @@ -1596,7 +1596,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private void cleanup(boolean isError) { cleanUpCurrentRead(); if (!isError) return; - for (VertorDeserializeOrcWriter asyncWriter : asyncWriters) { + for (VectorDeserializeOrcWriter asyncWriter : asyncWriters) { try { asyncWriter.interrupt(); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java new file mode 100644 index 0000000..c9df7d9 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VectorDeserializeOrcWriter.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.io.encoded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter; +import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter; +import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.io.orc.Writer; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.io.BinaryComparable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.TextInputFormat; + +/** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */ +class VectorDeserializeOrcWriter extends EncodingWriter implements Runnable { + private final VectorizedRowBatchCtx vrbCtx; + private Writer orcWriter; + private final LazySimpleDeserializeRead deserializeRead; + private final VectorDeserializeRow<?> vectorDeserializeRow; + private final StructObjectInspector destinationOi; + private final boolean usesSourceIncludes; + private final List<Integer> sourceIncludes; + + private final boolean isAsync; + private final Thread orcThread; + private final ConcurrentLinkedQueue<WriteOperation> queue; + private AsyncCallback completion; + + // Stored here only as async operation context. + private final boolean[] cacheIncludes; + + private VectorizedRowBatch sourceBatch, destinationBatch; + private List<VectorizedRowBatch> currentBatches; + + // TODO: if more writers are added, separate out an EncodingWriterFactory + public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe, + Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf, + Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes, + boolean[] cacheIncludes, int allocSize) throws IOException { + // Vector SerDe can be disabled both on client and server side. + if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED) + || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED) + || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) { + return new DeserializerOrcWriter(serDe, sourceOi, allocSize); + } + Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath); + PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively( + parts, path, null); + if (partDesc == null) { + LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path); + return new DeserializerOrcWriter(serDe, sourceOi, allocSize); + } + Properties tblProps = partDesc.getTableDesc().getProperties(); + if ("true".equalsIgnoreCase(tblProps.getProperty( + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) { + LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to " + + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST); + return new DeserializerOrcWriter(serDe, sourceOi, allocSize); + } + for (StructField sf : sourceOi.getAllStructFieldRefs()) { + Category c = sf.getFieldObjectInspector().getCategory(); + if (c != Category.PRIMITIVE) { + LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported"); + return new DeserializerOrcWriter(serDe, sourceOi, allocSize); + } + } + LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path); + return new VectorDeserializeOrcWriter( + daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize); + } + + private VectorDeserializeOrcWriter(Configuration conf, Properties tblProps, + StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes, + int allocSize) throws IOException { + super(sourceOi, allocSize); + // See also: the usage of VectorDeserializeType, for binary. For now, we only want text. + this.vrbCtx = createVrbCtx(sourceOi); + this.sourceIncludes = sourceIncludes; + this.cacheIncludes = cacheIncludes; + this.sourceBatch = vrbCtx.createVectorizedRowBatch(); + deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(), + /* useExternalBuffer */ true, createSerdeParams(conf, tblProps)); + vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead); + int colCount = vrbCtx.getRowColumnTypeInfos().length; + boolean[] includes = null; + this.usesSourceIncludes = sourceIncludes.size() < colCount; + if (usesSourceIncludes) { + // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the + // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them. + // In any case, we would also need to build a new OI for OrcWriter config. + // This is why OrcWriter is created after this writer, by the way. + this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size()); + includes = new boolean[colCount]; + int inclBatchIx = 0; + List<String> childNames = new ArrayList<>(sourceIncludes.size()); + List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size()); + List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs(); + for (Integer columnId : sourceIncludes) { + includes[columnId] = true; + assert inclBatchIx <= columnId; + // Note that we use the same vectors in both batches. Clever, very clever. + destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId]; + StructField sourceField = sourceFields.get(columnId); + childNames.add(sourceField.getFieldName()); + childOis.add(sourceField.getFieldObjectInspector()); + } + // This is only used by ORC to derive the structure. Most fields are unused. + destinationOi = new LazySimpleStructObjectInspector( + childNames, childOis, null, (byte)0, null); + destinationBatch.setPartitionInfo(sourceIncludes.size(), 0); + if (LlapIoImpl.LOG.isDebugEnabled()) { + LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes)); + } + try { + vectorDeserializeRow.init(includes); + } catch (HiveException e) { + throw new IOException(e); + } + } else { + // No includes - use the standard batch. + this.destinationBatch = sourceBatch; + this.destinationOi = sourceOi; + try { + vectorDeserializeRow.init(); + } catch (HiveException e) { + throw new IOException(e); + } + } + this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED); + if (isAsync) { + currentBatches = new LinkedList<>(); + queue = new ConcurrentLinkedQueue<>(); + orcThread = new Thread(this); + orcThread.setDaemon(true); + orcThread.setName(Thread.currentThread().getName() + "-OrcEncode"); + } else { + queue = null; + orcThread = null; + currentBatches = null; + } + } + + public void startAsync(AsyncCallback callback) { + this.completion = callback; + this.orcThread.start(); + } + + private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException { + VectorizedRowBatchCtx vrbCtx = new VectorizedRowBatchCtx(); + try { + vrbCtx.init(oi, new String[0]); + } catch (HiveException e) { + throw new IOException(e); + } + return vrbCtx; + } + + private static LazySerDeParameters createSerdeParams( + Configuration conf, Properties tblProps) throws IOException { + try { + return new LazySerDeParameters(conf, tblProps, LazySimpleSerDe.class.getName()); + } catch (SerDeException e) { + throw new IOException(e); + } + } + + @Override + public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException { + this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi); + this.cacheWriter = cacheWriter; + } + + public interface AsyncCallback { + void onComplete(VectorDeserializeOrcWriter writer); + } + + @Override + public void run() { + while (true) { + WriteOperation op = null; + int fallbackMs = 8; + while (true) { + op = queue.poll(); + if (op != null) break; + if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins. + LlapIoImpl.LOG.error("ORC encoder timed out waiting for input"); + discardData(); + return; + } + try { + Thread.sleep(fallbackMs); + } catch (InterruptedException e) { + LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input"); + discardData(); + return; + } + fallbackMs <<= 1; + } + try { + if (op.apply(orcWriter, cacheWriter)) { + LlapIoImpl.LOG.info("ORC encoder received a exit event"); + completion.onComplete(this); + return; + } + } catch (Exception e) { + LlapIoImpl.LOG.error("ORC encoder failed", e); + discardData(); + return; + } + } + } + + private void discardData() { + try { + cacheWriter.discardData(); + } catch (Exception ex) { + LlapIoImpl.LOG.error("Failed to close an async cache writer", ex); + } + } + + @Override + public void writeOneRow(Writable row) throws IOException { + if (sourceBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + flushBatch(); + } + + BinaryComparable binComp = (BinaryComparable)row; + deserializeRead.set(binComp.getBytes(), 0, binComp.getLength()); + + // Deserialize and append new row using the current batch size as the index. + try { + // Not using ByRef now since it's unsafe for text readers. Might be safe for others. + vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++); + } catch (Exception e) { + throw new IOException("DeserializeRead detail: " + + vectorDeserializeRow.getDetailedReadPositionString(), e); + } + } + + private void flushBatch() throws IOException { + addBatchToWriter(); + if (!isAsync) { + for (int c = 0; c < sourceBatch.cols.length; ++c) { + // This resets vectors in both batches. + ColumnVector colVector = sourceBatch.cols[c]; + if (colVector != null) { + colVector.reset(); + colVector.init(); + } + } + sourceBatch.selectedInUse = false; + sourceBatch.size = 0; + sourceBatch.endOfFile = false; + propagateSourceBatchFieldsToDest(); + } else { + // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline + // (neither ever changes the vectors). We'd need a set of vectors batch to write to. + // TODO: for now, create this from scratch. Ideally we should return the vectors from ops. + // We could also have the ORC thread create it for us in its spare time... + this.sourceBatch = vrbCtx.createVectorizedRowBatch(); + if (usesSourceIncludes) { + this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size()); + int inclBatchIx = 0; + for (Integer columnId : sourceIncludes) { + destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId]; + } + destinationBatch.setPartitionInfo(sourceIncludes.size(), 0); + } else { + this.destinationBatch = sourceBatch; + } + } + } + + private void propagateSourceBatchFieldsToDest() { + if (destinationBatch == sourceBatch) return; + destinationBatch.selectedInUse = sourceBatch.selectedInUse; + destinationBatch.size = sourceBatch.size; + destinationBatch.endOfFile = sourceBatch.endOfFile; + } + + private void addBatchToWriter() throws IOException { + propagateSourceBatchFieldsToDest(); + if (!isAsync) { + orcWriter.addRowBatch(destinationBatch); + } else { + currentBatches.add(destinationBatch); + addWriteOp(new VrbOperation(destinationBatch)); + } + } + + @Override + public void flushIntermediateData() throws IOException { + if (sourceBatch.size > 0) { + flushBatch(); + } + } + + @Override + public void writeIntermediateFooter() throws IOException { + if (isAsync) { + addWriteOp(new IntermediateFooterOperation()); + } else { + orcWriter.writeIntermediateFooter(); + } + } + + private void addWriteOp(WriteOperation wo) throws AssertionError { + if (queue.offer(wo)) return; + throw new AssertionError("Queue full"); // This should never happen with linked list queue. + } + + @Override + public void setCurrentStripeOffsets(long currentKnownTornStart, + long firstStartOffset, long lastStartOffset, long fileOffset) { + if (isAsync) { + addWriteOp(new SetStripeDataOperation( + currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset)); + } else { + cacheWriter.setCurrentStripeOffsets( + currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset); + } + } + + @Override + public void close() throws IOException { + if (sourceBatch.size > 0) { + addBatchToWriter(); + } + if (!isAsync) { + orcWriter.close(); + } else { + addWriteOp(new CloseOperation()); + } + } + + public List<VectorizedRowBatch> extractCurrentVrbs() { + if (!isAsync) return null; + List<VectorizedRowBatch> result = currentBatches; + currentBatches = new LinkedList<>(); + return result; + } + + private static interface WriteOperation { + boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException; + } + + private static class VrbOperation implements WriteOperation { + private VectorizedRowBatch batch; + + public VrbOperation(VectorizedRowBatch batch) { + // LlapIoImpl.LOG.debug("Adding batch " + batch); + this.batch = batch; + } + + @Override + public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { + // LlapIoImpl.LOG.debug("Writing batch " + batch); + writer.addRowBatch(batch); + return false; + } + } + + private static class IntermediateFooterOperation implements WriteOperation { + @Override + public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { + writer.writeIntermediateFooter(); + return false; + } + } + + private static class SetStripeDataOperation implements WriteOperation { + private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset; + public SetStripeDataOperation(long currentKnownTornStart, + long firstStartOffset, long lastStartOffset, long fileOffset) { + this.currentKnownTornStart = currentKnownTornStart; + this.firstStartOffset = firstStartOffset; + this.lastStartOffset = lastStartOffset; + this.fileOffset = fileOffset; + } + + @Override + public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { + cacheWriter.setCurrentStripeOffsets( + currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset); + return false; + } + } + + private static class CloseOperation implements WriteOperation { + @Override + public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { + writer.close(); + return true; // The thread should stop after this. + } + } + + public boolean[] getOriginalCacheIncludes() { + return cacheIncludes; + } + + @Override + public boolean isOnlyWritingIncludedColumns() { + return usesSourceIncludes; + } + + public void interrupt() { + assert orcThread != null; + orcThread.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/17063738/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java deleted file mode 100644 index 86d9ecc..0000000 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java +++ /dev/null @@ -1,458 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.llap.io.encoded; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.llap.DebugUtils; -import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; -import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.CacheWriter; -import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.DeserializerOrcWriter; -import org.apache.hadoop.hive.llap.io.encoded.SerDeEncodedDataReader.EncodingWriter; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.orc.Writer; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; -import org.apache.hadoop.hive.serde2.lazy.fast.LazySimpleDeserializeRead; -import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.io.BinaryComparable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.TextInputFormat; - -/** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */ -class VertorDeserializeOrcWriter extends EncodingWriter implements Runnable { - private final VectorizedRowBatchCtx vrbCtx; - private Writer orcWriter; - private final LazySimpleDeserializeRead deserializeRead; - private final VectorDeserializeRow<?> vectorDeserializeRow; - private final StructObjectInspector destinationOi; - private final boolean usesSourceIncludes; - private final List<Integer> sourceIncludes; - - private final boolean isAsync; - private final Thread orcThread; - private final ConcurrentLinkedQueue<WriteOperation> queue; - private AsyncCallback completion; - - // Stored here only as async operation context. - private final boolean[] cacheIncludes; - - private VectorizedRowBatch sourceBatch, destinationBatch; - private List<VectorizedRowBatch> currentBatches; - - // TODO: if more writers are added, separate out an EncodingWriterFactory - public static EncodingWriter create(InputFormat<?, ?> sourceIf, Deserializer serDe, - Map<Path, PartitionDesc> parts, Configuration daemonConf, Configuration jobConf, - Path splitPath, StructObjectInspector sourceOi, List<Integer> sourceIncludes, - boolean[] cacheIncludes, int allocSize) throws IOException { - // Vector SerDe can be disabled both on client and server side. - if (!HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED) - || !HiveConf.getBoolVar(jobConf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ENABLED) - || !(sourceIf instanceof TextInputFormat) || !(serDe instanceof LazySimpleSerDe)) { - return new DeserializerOrcWriter(serDe, sourceOi, allocSize); - } - Path path = splitPath.getFileSystem(daemonConf).makeQualified(splitPath); - PartitionDesc partDesc = HiveFileFormatUtils.getPartitionDescFromPathRecursively( - parts, path, null); - if (partDesc == null) { - LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: no partition desc for " + path); - return new DeserializerOrcWriter(serDe, sourceOi, allocSize); - } - Properties tblProps = partDesc.getTableDesc().getProperties(); - if ("true".equalsIgnoreCase(tblProps.getProperty( - serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST))) { - LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter due to " - + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST); - return new DeserializerOrcWriter(serDe, sourceOi, allocSize); - } - for (StructField sf : sourceOi.getAllStructFieldRefs()) { - Category c = sf.getFieldObjectInspector().getCategory(); - if (c != Category.PRIMITIVE) { - LlapIoImpl.LOG.info("Not using VertorDeserializeOrcWriter: " + c + " is not supported"); - return new DeserializerOrcWriter(serDe, sourceOi, allocSize); - } - } - LlapIoImpl.LOG.info("Creating VertorDeserializeOrcWriter for " + path); - return new VertorDeserializeOrcWriter( - daemonConf, tblProps, sourceOi, sourceIncludes, cacheIncludes, allocSize); - } - - private VertorDeserializeOrcWriter(Configuration conf, Properties tblProps, - StructObjectInspector sourceOi, List<Integer> sourceIncludes, boolean[] cacheIncludes, - int allocSize) throws IOException { - super(sourceOi, allocSize); - // See also: the usage of VectorDeserializeType, for binary. For now, we only want text. - this.vrbCtx = createVrbCtx(sourceOi); - this.sourceIncludes = sourceIncludes; - this.cacheIncludes = cacheIncludes; - this.sourceBatch = vrbCtx.createVectorizedRowBatch(); - deserializeRead = new LazySimpleDeserializeRead(vrbCtx.getRowColumnTypeInfos(), - /* useExternalBuffer */ true, createSerdeParams(conf, tblProps)); - vectorDeserializeRow = new VectorDeserializeRow<LazySimpleDeserializeRead>(deserializeRead); - int colCount = vrbCtx.getRowColumnTypeInfos().length; - boolean[] includes = null; - this.usesSourceIncludes = sourceIncludes.size() < colCount; - if (usesSourceIncludes) { - // VectorDeserializeRow produces "sparse" VRB when includes are used; we need to write the - // "dense" VRB to ORC. Ideally, we'd use projection columns, but ORC writer doesn't use them. - // In any case, we would also need to build a new OI for OrcWriter config. - // This is why OrcWriter is created after this writer, by the way. - this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size()); - includes = new boolean[colCount]; - int inclBatchIx = 0; - List<String> childNames = new ArrayList<>(sourceIncludes.size()); - List<ObjectInspector> childOis = new ArrayList<>(sourceIncludes.size()); - List<? extends StructField> sourceFields = sourceOi.getAllStructFieldRefs(); - for (Integer columnId : sourceIncludes) { - includes[columnId] = true; - assert inclBatchIx <= columnId; - // Note that we use the same vectors in both batches. Clever, very clever. - destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId]; - StructField sourceField = sourceFields.get(columnId); - childNames.add(sourceField.getFieldName()); - childOis.add(sourceField.getFieldObjectInspector()); - } - // This is only used by ORC to derive the structure. Most fields are unused. - destinationOi = new LazySimpleStructObjectInspector( - childNames, childOis, null, (byte)0, null); - destinationBatch.setPartitionInfo(sourceIncludes.size(), 0); - if (LlapIoImpl.LOG.isDebugEnabled()) { - LlapIoImpl.LOG.debug("Includes for deserializer are " + DebugUtils.toString(includes)); - } - try { - vectorDeserializeRow.init(includes); - } catch (HiveException e) { - throw new IOException(e); - } - } else { - // No includes - use the standard batch. - this.destinationBatch = sourceBatch; - this.destinationOi = sourceOi; - try { - vectorDeserializeRow.init(); - } catch (HiveException e) { - throw new IOException(e); - } - } - this.isAsync = HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_ENCODE_VECTOR_SERDE_ASYNC_ENABLED); - if (isAsync) { - currentBatches = new LinkedList<>(); - queue = new ConcurrentLinkedQueue<>(); - orcThread = new Thread(this); - orcThread.setDaemon(true); - orcThread.setName(Thread.currentThread().getName() + "-OrcEncode"); - } else { - queue = null; - orcThread = null; - currentBatches = null; - } - } - - public void startAsync(AsyncCallback callback) { - this.completion = callback; - this.orcThread.start(); - } - - private static VectorizedRowBatchCtx createVrbCtx(StructObjectInspector oi) throws IOException { - VectorizedRowBatchCtx vrbCtx = new VectorizedRowBatchCtx(); - try { - vrbCtx.init(oi, new String[0]); - } catch (HiveException e) { - throw new IOException(e); - } - return vrbCtx; - } - - private static LazySerDeParameters createSerdeParams( - Configuration conf, Properties tblProps) throws IOException { - try { - return new LazySerDeParameters(conf, tblProps, LazySimpleSerDe.class.getName()); - } catch (SerDeException e) { - throw new IOException(e); - } - } - - @Override - public void init(CacheWriter cacheWriter, Configuration conf, Path path) throws IOException { - this.orcWriter = super.createOrcWriter(cacheWriter, conf, path, destinationOi); - this.cacheWriter = cacheWriter; - } - - public interface AsyncCallback { - void onComplete(VertorDeserializeOrcWriter writer); - } - - @Override - public void run() { - while (true) { - WriteOperation op = null; - int fallbackMs = 8; - while (true) { - op = queue.poll(); - if (op != null) break; - if (fallbackMs > 262144) { // Arbitrary... we don't expect caller to hang out for 7+ mins. - LlapIoImpl.LOG.error("ORC encoder timed out waiting for input"); - discardData(); - return; - } - try { - Thread.sleep(fallbackMs); - } catch (InterruptedException e) { - LlapIoImpl.LOG.error("ORC encoder interrupted waiting for input"); - discardData(); - return; - } - fallbackMs <<= 1; - } - try { - if (op.apply(orcWriter, cacheWriter)) { - LlapIoImpl.LOG.info("ORC encoder received a exit event"); - completion.onComplete(this); - return; - } - } catch (Exception e) { - LlapIoImpl.LOG.error("ORC encoder failed", e); - discardData(); - return; - } - } - } - - private void discardData() { - try { - cacheWriter.discardData(); - } catch (Exception ex) { - LlapIoImpl.LOG.error("Failed to close an async cache writer", ex); - } - } - - @Override - public void writeOneRow(Writable row) throws IOException { - if (sourceBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { - flushBatch(); - } - - BinaryComparable binComp = (BinaryComparable)row; - deserializeRead.set(binComp.getBytes(), 0, binComp.getLength()); - - // Deserialize and append new row using the current batch size as the index. - try { - // Not using ByRef now since it's unsafe for text readers. Might be safe for others. - vectorDeserializeRow.deserialize(sourceBatch, sourceBatch.size++); - } catch (Exception e) { - throw new IOException("DeserializeRead detail: " - + vectorDeserializeRow.getDetailedReadPositionString(), e); - } - } - - private void flushBatch() throws IOException { - addBatchToWriter(); - if (!isAsync) { - for (int c = 0; c < sourceBatch.cols.length; ++c) { - // This resets vectors in both batches. - ColumnVector colVector = sourceBatch.cols[c]; - if (colVector != null) { - colVector.reset(); - colVector.init(); - } - } - sourceBatch.selectedInUse = false; - sourceBatch.size = 0; - sourceBatch.endOfFile = false; - propagateSourceBatchFieldsToDest(); - } else { - // In addBatchToWriter, we have passed the batch to both ORC and operator pipeline - // (neither ever changes the vectors). We'd need a set of vectors batch to write to. - // TODO: for now, create this from scratch. Ideally we should return the vectors from ops. - // We could also have the ORC thread create it for us in its spare time... - this.sourceBatch = vrbCtx.createVectorizedRowBatch(); - if (usesSourceIncludes) { - this.destinationBatch = new VectorizedRowBatch(sourceIncludes.size()); - int inclBatchIx = 0; - for (Integer columnId : sourceIncludes) { - destinationBatch.cols[inclBatchIx++] = sourceBatch.cols[columnId]; - } - destinationBatch.setPartitionInfo(sourceIncludes.size(), 0); - } else { - this.destinationBatch = sourceBatch; - } - } - } - - private void propagateSourceBatchFieldsToDest() { - if (destinationBatch == sourceBatch) return; - destinationBatch.selectedInUse = sourceBatch.selectedInUse; - destinationBatch.size = sourceBatch.size; - destinationBatch.endOfFile = sourceBatch.endOfFile; - } - - private void addBatchToWriter() throws IOException { - propagateSourceBatchFieldsToDest(); - if (!isAsync) { - orcWriter.addRowBatch(destinationBatch); - } else { - currentBatches.add(destinationBatch); - addWriteOp(new VrbOperation(destinationBatch)); - } - } - - @Override - public void flushIntermediateData() throws IOException { - if (sourceBatch.size > 0) { - flushBatch(); - } - } - - @Override - public void writeIntermediateFooter() throws IOException { - if (isAsync) { - addWriteOp(new IntermediateFooterOperation()); - } else { - orcWriter.writeIntermediateFooter(); - } - } - - private void addWriteOp(WriteOperation wo) throws AssertionError { - if (queue.offer(wo)) return; - throw new AssertionError("Queue full"); // This should never happen with linked list queue. - } - - @Override - public void setCurrentStripeOffsets(long currentKnownTornStart, - long firstStartOffset, long lastStartOffset, long fileOffset) { - if (isAsync) { - addWriteOp(new SetStripeDataOperation( - currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset)); - } else { - cacheWriter.setCurrentStripeOffsets( - currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset); - } - } - - @Override - public void close() throws IOException { - if (sourceBatch.size > 0) { - addBatchToWriter(); - } - if (!isAsync) { - orcWriter.close(); - } else { - addWriteOp(new CloseOperation()); - } - } - - public List<VectorizedRowBatch> extractCurrentVrbs() { - if (!isAsync) return null; - List<VectorizedRowBatch> result = currentBatches; - currentBatches = new LinkedList<>(); - return result; - } - - private static interface WriteOperation { - boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException; - } - - private static class VrbOperation implements WriteOperation { - private VectorizedRowBatch batch; - - public VrbOperation(VectorizedRowBatch batch) { - // LlapIoImpl.LOG.debug("Adding batch " + batch); - this.batch = batch; - } - - @Override - public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { - // LlapIoImpl.LOG.debug("Writing batch " + batch); - writer.addRowBatch(batch); - return false; - } - } - - private static class IntermediateFooterOperation implements WriteOperation { - @Override - public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { - writer.writeIntermediateFooter(); - return false; - } - } - - private static class SetStripeDataOperation implements WriteOperation { - private final long currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset; - public SetStripeDataOperation(long currentKnownTornStart, - long firstStartOffset, long lastStartOffset, long fileOffset) { - this.currentKnownTornStart = currentKnownTornStart; - this.firstStartOffset = firstStartOffset; - this.lastStartOffset = lastStartOffset; - this.fileOffset = fileOffset; - } - - @Override - public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { - cacheWriter.setCurrentStripeOffsets( - currentKnownTornStart, firstStartOffset, lastStartOffset, fileOffset); - return false; - } - } - - private static class CloseOperation implements WriteOperation { - @Override - public boolean apply(Writer writer, CacheWriter cacheWriter) throws IOException { - writer.close(); - return true; // The thread should stop after this. - } - } - - public boolean[] getOriginalCacheIncludes() { - return cacheIncludes; - } - - @Override - public boolean isOnlyWritingIncludedColumns() { - return usesSourceIncludes; - } - - public void interrupt() { - assert orcThread != null; - orcThread.interrupt(); - } -}
