http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index 0b1ac4b..b018adb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -37,6 +37,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; + import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -363,8 +366,12 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ Map<CombinePathInputFormat, CombineFilter> poolMap = new HashMap<CombinePathInputFormat, CombineFilter>(); Set<Path> poolSet = new HashSet<Path>(); + LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); for (Path path : paths) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); + PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively( pathToPartitionInfo, path, IOPrepareCache.get().allocatePartitionDescMap()); TableDesc tableDesc = part.getTableDesc();
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index cc77e4c..f73a8e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -87,7 +87,7 @@ public final class HiveFileFormatUtils { public static class FileChecker { // we don't have many file formats that implement InputFormatChecker. We won't be holding // multiple instances of such classes - private static int MAX_CACHE_SIZE = 16; + private static final int MAX_CACHE_SIZE = 16; // immutable maps Map<Class<? extends InputFormat>, Class<? extends InputFormatChecker>> inputFormatCheckerMap; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index c697407..9b83cb4 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -89,14 +89,13 @@ import org.apache.hive.common.util.ReflectionUtil; */ public class HiveInputFormat<K extends WritableComparable, V extends Writable> implements InputFormat<K, V>, JobConfigurable { - private static final String CLASS_NAME = HiveInputFormat.class.getName(); private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); /** * A cache of InputFormat instances. */ - private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats + private static final Map<Class, InputFormat<WritableComparable, Writable>> inputFormats = new ConcurrentHashMap<Class, InputFormat<WritableComparable, Writable>>(); private JobConf job; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java index d391164..f41edc4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java @@ -839,7 +839,7 @@ public class RCFile { // the max size of memory for buffering records before writes them out private int columnsBufferSize = 4 * 1024 * 1024; // 4M // the conf string for COLUMNS_BUFFER_SIZE - public static String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size"; + public static final String COLUMNS_BUFFER_SIZE_CONF_STR = "hive.io.rcfile.record.buffer.size"; // how many records already buffered private int bufferedRecords = 0; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 96ca736..cbd38ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.orc.FileMetadata; import org.apache.orc.PhysicalWriter; -import org.apache.orc.impl.MemoryManager; +import org.apache.orc.MemoryManager; import org.apache.orc.TypeDescription; import org.apache.orc.impl.OrcTail; @@ -258,7 +258,7 @@ public final class OrcFile extends org.apache.orc.OrcFile { /** * A package local option to set the memory manager. */ - protected WriterOptions memory(MemoryManager value) { + public WriterOptions memory(MemoryManager value) { super.memory(value); return this; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 59682db..8fb7211 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -158,7 +158,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } private static final Logger LOG = LoggerFactory.getLogger(OrcInputFormat.class); - private static boolean isDebugEnabled = LOG.isDebugEnabled(); + private static final boolean isDebugEnabled = LOG.isDebugEnabled(); static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024; @@ -1531,7 +1531,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Reader.Options readerOptions = new Reader.Options(context.conf); if (readerTypes == null) { readerIncluded = genIncludedColumns(fileSchema, context.conf); - evolution = new SchemaEvolution(fileSchema, readerOptions.include(readerIncluded)); + evolution = new SchemaEvolution(fileSchema, null, readerOptions.include(readerIncluded)); } else { // The reader schema always comes in without ACID columns. TypeDescription readerSchema = OrcUtils.convertTypeFromProtobuf(readerTypes, 0); @@ -1913,10 +1913,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } - // The schema type description does not include the ACID fields (i.e. it is the - // non-ACID original schema). - private static boolean SCHEMA_TYPES_IS_ORIGINAL = true; - @Override public RowReader<OrcStruct> getReader(InputSplit inputSplit, Options options) http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 0ac3ec5..5b2e9b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.IdentityHashMap; import java.util.List; import org.slf4j.Logger; @@ -43,11 +46,13 @@ import org.apache.orc.impl.RecordReaderUtils; import org.apache.orc.impl.StreamName; import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.orc.OrcProto; +import sun.misc.Cleaner; + + /** * Encoded reader implementation. * @@ -80,6 +85,17 @@ import org.apache.orc.OrcProto; */ class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); + private static Field cleanerField; + static { + try { + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class<?> dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + } catch (Throwable t) { + cleanerField = null; + } + } private static final Object POOLS_CREATION_LOCK = new Object(); private static Pools POOLS; private static class Pools { @@ -204,8 +220,8 @@ class EncodedReaderImpl implements EncodedReader { @Override public void readEncodedColumns(int stripeIx, StripeInformation stripe, - OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList, - boolean[] included, boolean[][] colRgs, + OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, + List<OrcProto.Stream> streamList, boolean[] included, boolean[][] colRgs, Consumer<OrcEncodedColumnBatch> consumer) throws IOException { // Note: for now we don't have to setError here, caller will setError if we throw. // We are also not supposed to call setDone, since we are only part of the operation. @@ -303,15 +319,35 @@ class EncodedReaderImpl implements EncodedReader { } } + // TODO: the memory release could be optimized - we could release original buffers after we + // are fully done with each original buffer from disk. For now release all at the end; + // it doesn't increase the total amount of memory we hold, just the duration a bit. + // This is much simpler - we can just remember original ranges after reading them, and + // release them at the end. In a few cases where it's easy to determine that a buffer + // can be freed in advance, we remove it from the map. + IdentityHashMap<ByteBuffer, Boolean> toRelease = null; if (!isAllInCache.value) { if (!isDataReaderOpen) { this.dataReader.open(); isDataReaderOpen = true; } dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc()); + toRelease = new IdentityHashMap<>(); + DiskRangeList drl = toRead.next; + while (drl != null) { + if (drl instanceof BufferChunk) { + toRelease.put(drl.getData(), true); + } + drl = drl.next; + } } // 3. For uncompressed case, we need some special processing before read. + // Basically, we are trying to create artificial, consistent ranges to cache, as there are + // no CBs in an uncompressed file. At the end of this processing, the list would contain + // either cache buffers, or buffers allocated by us and not cached (if we are only reading + // parts of the data for some ranges and don't want to cache it). Both are represented by + // CacheChunks, so the list is just CacheChunk-s from that point on. DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract(). if (codec == null) { for (int colIx = 0; colIx < colCtxs.length; ++colIx) { @@ -326,6 +362,12 @@ class EncodedReaderImpl implements EncodedReader { } } } + // Release buffers as we are done with all the streams... also see toRelease comment.\ + // With uncompressed streams, we know we are done earlier. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + toRelease = null; + } if (isTracingEnabled) { LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset " + stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next)); @@ -374,8 +416,8 @@ class EncodedReaderImpl implements EncodedReader { if (sctx.stripeLevelStream == null) { sctx.stripeLevelStream = POOLS.csdPool.take(); // We will be using this for each RG while also sending RGs to processing. - // To avoid buffers being unlocked, run refcount one ahead; we will not increase - // it when building the last RG, so each RG processing will decref once, and the + // To avoid buffers being unlocked, run refcount one ahead; so each RG + // processing will decref once, and the // last one will unlock the buffers. sctx.stripeLevelStream.incRef(); // For stripe-level streams we don't need the extra refcount on the block. @@ -383,14 +425,12 @@ class EncodedReaderImpl implements EncodedReader { long unlockUntilCOffset = sctx.offset + sctx.length; DiskRangeList lastCached = readEncodedStream(stripeOffset, iter, sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream, - unlockUntilCOffset, sctx.offset); + unlockUntilCOffset, sctx.offset, toRelease); if (lastCached != null) { iter = lastCached; } } - if (!isLastRg) { - sctx.stripeLevelStream.incRef(); - } + sctx.stripeLevelStream.incRef(); cb = sctx.stripeLevelStream; } else { // This stream can be separated by RG using index. Let's do that. @@ -411,7 +451,7 @@ class EncodedReaderImpl implements EncodedReader { boolean isStartOfStream = sctx.bufferIter == null; DiskRangeList lastCached = readEncodedStream(stripeOffset, (isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb, - unlockUntilCOffset, sctx.offset); + unlockUntilCOffset, sctx.offset, toRelease); if (lastCached != null) { sctx.bufferIter = iter = lastCached; } @@ -437,7 +477,27 @@ class EncodedReaderImpl implements EncodedReader { } // Release the unreleased buffers. See class comment about refcounts. + for (int colIx = 0; colIx < colCtxs.length; ++colIx) { + ColumnReadContext ctx = colCtxs[colIx]; + if (ctx == null) continue; // This column is not included. + for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { + StreamContext sctx = ctx.streams[streamIx]; + if (sctx == null || sctx.stripeLevelStream == null) continue; + if (0 != sctx.stripeLevelStream.decRef()) continue; + for (MemoryBuffer buf : sctx.stripeLevelStream.getCacheBuffers()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Unlocking {} at the end of processing", buf); + } + cacheWrapper.releaseBuffer(buf); + } + } + } + releaseInitialRefcounts(toRead.next); + // Release buffers as we are done with all the streams... also see toRelease comment. + if (toRelease != null) { + releaseBuffers(toRelease.keySet(), true); + } releaseCacheChunksIntoObjectPool(toRead.next); } @@ -605,8 +665,8 @@ class EncodedReaderImpl implements EncodedReader { * the master list, so they are safe to keep as iterators for various streams. */ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset, - long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset) - throws IOException { + long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset, + IdentityHashMap<ByteBuffer, Boolean> toRelease) throws IOException { if (csd.getCacheBuffers() == null) { csd.setCacheBuffers(new ArrayList<MemoryBuffer>()); } else { @@ -615,10 +675,10 @@ class EncodedReaderImpl implements EncodedReader { if (cOffset == endCOffset) return null; boolean isCompressed = codec != null; List<ProcCacheChunk> toDecompress = null; - List<ByteBuffer> toRelease = null; List<IncompleteCb> badEstimates = null; + List<ByteBuffer> toReleaseCopies = null; if (isCompressed) { - toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>(); + toReleaseCopies = new ArrayList<>(); toDecompress = new ArrayList<>(); badEstimates = new ArrayList<>(); } @@ -636,8 +696,8 @@ class EncodedReaderImpl implements EncodedReader { // 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below). try { lastUncompressed = isCompressed ? - prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, - unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates) + prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset, unlockUntilCOffset, + current, csd, toRelease, toReleaseCopies, toDecompress, badEstimates) : prepareRangesForUncompressedRead( cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd); } catch (Exception ex) { @@ -657,7 +717,10 @@ class EncodedReaderImpl implements EncodedReader { assert result == null; // We don't expect conflicts from bad estimates. } - if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do. + if (toDecompress == null || toDecompress.isEmpty()) { + releaseBuffers(toReleaseCopies, false); + return lastUncompressed; // Nothing to do. + } // 3. Allocate the buffers, prepare cache keys. // At this point, we have read all the CBs we need to read. cacheBuffers contains some cache @@ -690,21 +753,18 @@ class EncodedReaderImpl implements EncodedReader { cacheWrapper.reuseBuffer(chunk.getBuffer()); } - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert dataReader.isTrackingDiskRanges(); - for (ByteBuffer buffer : toRelease) { - dataReader.releaseBuffer(buffer); - } - } + // 5. Release the copies we made directly to the cleaner. + releaseBuffers(toReleaseCopies, false); // 6. Finally, put uncompressed data to cache. if (fileKey != null) { - long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); + long[] collisionMask = cacheWrapper.putFileData( + fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers()); } - // 7. It may happen that we know we won't use some compression buffers anymore. + // 7. It may happen that we know we won't use some cache buffers anymore (the alternative + // is that we will use the same buffers for other streams in separate calls). // Release initial refcounts. for (ProcCacheChunk chunk : toDecompress) { ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk); @@ -713,9 +773,11 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } + /** Subset of readEncodedStream specific to compressed streams, separate to avoid long methods. */ private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData, - List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress, + long streamOffset, long unlockUntilCOffset, DiskRangeList current, + ColumnStreamData columnStreamData, IdentityHashMap<ByteBuffer, Boolean> toRelease, + List<ByteBuffer> toReleaseCopies, List<ProcCacheChunk> toDecompress, List<IncompleteCb> badEstimates) throws IOException { if (cOffset > current.getOffset()) { // Target compression block is in the middle of the range; slice the range in two. @@ -762,8 +824,8 @@ class EncodedReaderImpl implements EncodedReader { throw new RuntimeException(msg); } BufferChunk bc = (BufferChunk)current; - ProcCacheChunk newCached = addOneCompressionBuffer( - bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates); + ProcCacheChunk newCached = addOneCompressionBuffer(bc, columnStreamData.getCacheBuffers(), + toDecompress, toRelease, toReleaseCopies, badEstimates); lastUncompressed = (newCached == null) ? lastUncompressed : newCached; next = (newCached != null) ? newCached.next : null; currentOffset = (next != null) ? next.getOffset() : -1; @@ -777,9 +839,12 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } + /** Subset of readEncodedStream specific to uncompressed streams, separate to avoid long methods. */ private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset, - long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData) - throws IOException { + long streamOffset, long unlockUntilCOffset, DiskRangeList current, + ColumnStreamData columnStreamData) throws IOException { + // Note: we are called after preReadUncompressedStream, so it doesn't have to do nearly as much + // as prepareRangesForCompressedRead does; e.g. every buffer is already a CacheChunk. long currentOffset = cOffset; CacheChunk lastUncompressed = null; boolean isFirst = true; @@ -819,11 +884,10 @@ class EncodedReaderImpl implements EncodedReader { * We could avoid copy in non-zcr case and manage the buffer that was not allocated by our * allocator. Uncompressed case is not mainline though so let's not complicate it. */ - private DiskRangeList preReadUncompressedStream(long baseOffset, - DiskRangeList start, long streamOffset, long streamEnd) throws IOException { + private DiskRangeList preReadUncompressedStream(long baseOffset, DiskRangeList start, + long streamOffset, long streamEnd) throws IOException { if (streamOffset == streamEnd) return null; List<UncompressedCacheChunk> toCache = null; - List<ByteBuffer> toRelease = null; // 1. Find our bearings in the stream. DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd); @@ -860,9 +924,6 @@ class EncodedReaderImpl implements EncodedReader { if (current.getOffset() >= partEnd) { continue; // We have no data at all for this part of the stream (could be unneeded), skip. } - if (toRelease == null && dataReader.isTrackingDiskRanges()) { - toRelease = new ArrayList<ByteBuffer>(); - } // We have some disk buffers... see if we have entire part, etc. UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part. DiskRangeList next = current; @@ -877,21 +938,15 @@ class EncodedReaderImpl implements EncodedReader { current = next; if (noMoreDataForPart) break; // Done with this part. - boolean wasSplit = false; if (current.getEnd() > partEnd) { // If the current buffer contains multiple parts, split it. current = current.split(partEnd); - wasSplit = true; } if (isTracingEnabled) { LOG.trace("Processing uncompressed file data at [" + current.getOffset() + ", " + current.getEnd() + ")"); } BufferChunk curBc = (BufferChunk)current; - if (!wasSplit && toRelease != null) { - toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part? - } - // Track if we still have the entire part. long hadEntirePartTo = hasEntirePartTo; // We have data until the end of current block if we had it until the beginning. @@ -952,15 +1007,7 @@ class EncodedReaderImpl implements EncodedReader { ++ix; } - // 5. Release original compressed buffers to zero-copy reader if needed. - if (toRelease != null) { - assert dataReader.isTrackingDiskRanges(); - for (ByteBuffer buf : toRelease) { - dataReader.releaseBuffer(buf); - } - } - - // 6. Finally, put uncompressed data to cache. + // 5. Put uncompressed data to cache. if (fileKey != null) { long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset); processCacheCollisions(collisionMask, toCache, targetBuffers, null); @@ -969,7 +1016,6 @@ class EncodedReaderImpl implements EncodedReader { return lastUncompressed; } - private int determineUncompressedPartSize() { // We will break the uncompressed data in the cache in the chunks that are the size // of the prevalent ORC compression buffer (the default), or maximum allocation (since we @@ -1178,7 +1224,8 @@ class EncodedReaderImpl implements EncodedReader { */ private ProcCacheChunk addOneCompressionBuffer(BufferChunk current, List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress, - List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException { + IdentityHashMap<ByteBuffer, Boolean> toRelease, List<ByteBuffer> toReleaseCopies, + List<IncompleteCb> badEstimates) throws IOException { ByteBuffer slice = null; ByteBuffer compressed = current.getChunk(); long cbStartOffset = current.getOffset(); @@ -1201,12 +1248,8 @@ class EncodedReaderImpl implements EncodedReader { // Simple case - CB fits entirely in the disk range. slice = compressed.slice(); slice.limit(chunkLength); - ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed, + return addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) { - toRelease.add(compressed); - } - return cc; } if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0)); @@ -1216,6 +1259,7 @@ class EncodedReaderImpl implements EncodedReader { // TODO: we could remove extra copy for isUncompressed case by copying directly to cache. // We need to consolidate 2 or more buffers into one to decompress. ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); + toReleaseCopies.add(copy); // We will always release copies at the end. int remaining = chunkLength - compressed.remaining(); int originalPos = compressed.position(); copy.put(compressed); @@ -1224,12 +1268,8 @@ class EncodedReaderImpl implements EncodedReader { } DiskRangeList next = current.next; current.removeSelf(); - if (dataReader.isTrackingDiskRanges()) { - if (originalPos == 0) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. - } else { - toRelease.add(compressed); // There might be slices depending on this buffer. - } + if (originalPos == 0 && toRelease.remove(compressed)) { + releaseBuffer(compressed, true); } int extraChunkCount = 0; @@ -1246,15 +1286,15 @@ class EncodedReaderImpl implements EncodedReader { copy.put(slice); ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers); - if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. - } + if (compressed.remaining() <= 0 && toRelease.remove(compressed)) { + releaseBuffer(compressed, true); // We copied the entire buffer. + } // else there's more data to process; will be handled in next call. return cc; } remaining -= compressed.remaining(); - copy.put(compressed); - if (dataReader.isTrackingDiskRanges()) { - dataReader.releaseBuffer(compressed); // We copied the entire buffer. + copy.put(compressed); // TODO: move into the if below; account for release call + if (toRelease.remove(compressed)) { + releaseBuffer(compressed, true); // We copied the entire buffer. } DiskRangeList tmp = next; next = next.hasContiguousNext() ? next.next : null; @@ -1270,6 +1310,38 @@ class EncodedReaderImpl implements EncodedReader { } } + private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) { + if (toRelease == null) return; + for (ByteBuffer buf : toRelease) { + releaseBuffer(buf, isFromDataReader); + } + } + + private void releaseBuffer(ByteBuffer bb, boolean isFromDataReader) { + if (isTracingEnabled) { + LOG.trace("Releasing the buffer " + System.identityHashCode(bb)); + } + if (isFromDataReader && dataReader.isTrackingDiskRanges()) { + dataReader.releaseBuffer(bb); + return; + } + Field localCf = cleanerField; + if (!bb.isDirect() || localCf == null) return; + try { + Cleaner cleaner = (Cleaner) localCf.get(bb); + if (cleaner != null) { + cleaner.clean(); + } else { + LOG.debug("Unable to clean a buffer using cleaner - no cleaner"); + } + } catch (Exception e) { + // leave it for GC to clean up + LOG.warn("Unable to clean direct buffers using Cleaner."); + cleanerField = null; + } + } + + private IncompleteCb addIncompleteCompressionBuffer( long cbStartOffset, DiskRangeList target, int extraChunkCount) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java index 26f1e75..a7bb5ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetOutputFormat.java @@ -21,6 +21,7 @@ import java.util.Properties; import java.util.TimeZone; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.parquet.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,14 +140,11 @@ public class MapredParquetOutputFormat extends FileOutputFormat<NullWritable, Pa String timeZoneID = tableProperties.getProperty(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY); if (!Strings.isNullOrEmpty(timeZoneID)) { - if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) { - throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID); - } + + NanoTimeUtils.validateTimeZone(timeZoneID); return TimeZone.getTimeZone(timeZoneID); } - // If no timezone is defined in table properties, then adjust timestamps using - // PARQUET_INT96_NO_ADJUSTMENT_ZONE timezone - return TimeZone.getTimeZone(ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE); + return TimeZone.getDefault(); } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java index 8e33b7d..2954601 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java @@ -20,6 +20,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetTableUtils; +import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.TimeZone; @@ -170,7 +170,7 @@ public class ParquetRecordReaderBase { boolean skipConversion = HiveConf.getBoolVar(configuration, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION); FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); - if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") || + if (!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr") && skipConversion) { // Impala writes timestamp values using GMT only. We should not try to convert Impala // files to other type of timezones. @@ -179,16 +179,12 @@ public class ParquetRecordReaderBase { // TABLE_PARQUET_INT96_TIMEZONE is a table property used to detect what timezone conversion // to use when reading Parquet timestamps. timeZoneID = configuration.get(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, - ParquetTableUtils.PARQUET_INT96_NO_ADJUSTMENT_ZONE); - - if (!Arrays.asList(TimeZone.getAvailableIDs()).contains(timeZoneID)) { - throw new IllegalStateException("Unexpected timezone id found for parquet int96 conversion: " + timeZoneID); - } + TimeZone.getDefault().getID()); + NanoTimeUtils.validateTimeZone(timeZoneID); } // 'timeZoneID' should be valid, since we did not throw exception above - configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY, - TimeZone.getTimeZone(timeZoneID).getID()); + configuration.set(ParquetTableUtils.PARQUET_INT96_WRITE_ZONE_PROPERTY,timeZoneID); } public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java index 5dc8088..dbd6fb3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java @@ -152,13 +152,26 @@ public class NanoTimeUtils { calendar.setTimeInMillis(utcCalendar.getTimeInMillis()); - Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, Calendar.getInstance()); + Calendar adjusterCalendar = copyToCalendarWithTZ(calendar, getLocalCalendar()); Timestamp ts = new Timestamp(adjusterCalendar.getTimeInMillis()); ts.setNanos((int) nanos); return ts; } + /** + * Check if the string id is a valid java TimeZone id. + * TimeZone#getTimeZone will return "GMT" if the id cannot be understood. + * @param timeZoneID + */ + public static void validateTimeZone(String timeZoneID) { + if (TimeZone.getTimeZone(timeZoneID).getID().equals("GMT") + && !"GMT".equals(timeZoneID)) { + throw new IllegalStateException( + "Unexpected timezone id found for parquet int96 conversion: " + timeZoneID); + } + } + private static Calendar copyToCalendarWithTZ(Calendar from, Calendar to) { if(from.getTimeZone().getID().equals(to.getTimeZone().getID())) { return from; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 6ca1963..312cdac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -121,8 +121,9 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase try { serDeStats = new SerDeStats(); projectionPusher = new ProjectionPusher(); - if (oldInputSplit != null) { - initialize(getSplit(oldInputSplit, conf), conf); + ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf); + if (inputSplit != null) { + initialize(inputSplit, conf); setTimeZoneConversion(jobConf, ((FileSplit) oldInputSplit).getPath()); } colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 00c9645..5401c7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; @@ -58,7 +57,6 @@ import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RunningJob; @@ -77,8 +75,6 @@ import org.apache.logging.log4j.core.appender.RollingFileAppender; @SuppressWarnings( { "deprecation"}) public class PartialScanTask extends Task<PartialScanWork> implements Serializable, HadoopJobExecHook { - - private static final long serialVersionUID = 1L; protected transient JobConf job; @@ -274,7 +270,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements return "RCFile Statistics Partial Scan"; } - public static String INPUT_SEPERATOR = ":"; + public static final String INPUT_SEPERATOR = ":"; public static void main(String[] args) { String inputPathStr = null; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index d255265..01e8a48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.LockTableDesc; +import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; +import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hive.common.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger; * with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method. * The later may (usually will) be called from a timer thread. * See {@link #getMS()} for more important concurrency/metastore access notes. + * + * Each statement that the TM (transaction manager) should be aware of should belong to a transaction. + * Effectively, that means any statement that has side effects. Exceptions are statements like + * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either + * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or + * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true + * from end user poit of view). See more at {@link #isExplicitTransaction}. */ public final class DbTxnManager extends HiveTxnManagerImpl { @@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl { * to keep apart multiple writes of the same data within the same transaction * Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options} */ - private int statementId = -1; + private int writeId = -1; + /** + * counts number of statements in the current transaction + */ + private int numStatements = 0; + /** + * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot + * include any Operations which cannot be rolled back (drop partition; write to non-acid table). + * If false, it's a single statement transaction which can include any statement. This is not a + * contradiction from the user point of view who doesn't know anything about the implicit txn + * and cannot call rollback (the statement of course can fail in which case there is nothing to + * rollback (assuming the statement is well implemented)). + * + * This is done so that all commands run in a transaction which simplifies implementation and + * allows a simple implementation of multi-statement txns which don't require a lock manager + * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works) + * + * Also, critically important, ensuring that everything runs in a transaction assigns an order + * to all operations in the system - needed for replication/DR. + * + * We don't want to allow non-transactional statements in a user demarcated txn because the effect + * of such statement is "visible" immediately on statement completion, but the user may + * issue a rollback but the action of the statement can't be undone (and has possibly already been + * seen by another txn). For example, + * start transaction + * insert into transactional_table values(1); + * insert into non_transactional_table select * from transactional_table; + * rollback + * + * The user would be in for a surprise especially if they are not aware of transactional + * properties of the tables involved. + * + * As a side note: what should the lock manager do with locks for non-transactional resources? + * Should it it release them at the end of the stmt or txn? + * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html + */ + private boolean isExplicitTransaction = false; + /** + * To ensure transactions don't nest. + */ + private int startTransactionCount = 0; // QueryId for the query in current transaction private String queryId; @@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl { @VisibleForTesting long openTxn(Context ctx, String user, long delay) throws LockException { - //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call - //whenever it chooses + /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call + whenever it chooses + A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock + acquisition. Relying on locks is a pessimistic strategy which works better under high + contention.*/ init(); + getLockManager(); if(isTxnOpen()) { throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId)); } try { txnId = getMS().openTxn(user); - statementId = 0; + writeId = 0; + numStatements = 0; + isExplicitTransaction = false; + startTransactionCount = 0; LOG.debug("Opened " + JavaUtils.txnIdToString(txnId)); ctx.setHeartbeater(startHeartbeat(delay)); return txnId; @@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } /** - * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will - * be read by a different threads that one writing it, thus it's {@code volatile} + * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will + * be read by a different threads than one writing it, thus it's {@code volatile} */ @Override public HiveLockManager getLockManager() throws LockException { @@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl { catch(LockException e) { if(e.getCause() instanceof TxnAbortedException) { txnId = 0; - statementId = -1; + writeId = -1; } throw e; } } /** - * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} + * Watermark to include in error msgs and logs + * @param queryPlan + * @return + */ + private static String getQueryIdWaterMark(QueryPlan queryPlan) { + return "queryId=" + queryPlan.getQueryId(); + } + + private void markExplicitTransaction(QueryPlan queryPlan) throws LockException { + isExplicitTransaction = true; + if(++startTransactionCount > 1) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(), + JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId()); + } + + } + /** + * Ensures that the current SQL statement is appropriate for the current state of the + * Transaction Manager (e.g. can call commit unless you called start transaction) + * + * Note that support for multi-statement txns is a work-in-progress so it's only supported in + * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST. + * @param queryPlan + * @throws LockException + */ + private void verifyState(QueryPlan queryPlan) throws LockException { + if(!isTxnOpen()) { + throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + + " for " + getQueryIdWaterMark(queryPlan)); + } + if(queryPlan.getOperation() == null) { + throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan)); + } + numStatements++; + switch (queryPlan.getOperation()) { + case START_TRANSACTION: + markExplicitTransaction(queryPlan); + break; + case COMMIT: + case ROLLBACK: + if(!isTxnOpen()) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName()); + } + if(!isExplicitTransaction) { + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName()); + } + break; + default: + if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) { + //for example, drop table in an explicit txn is not allowed + //in some cases this requires looking at more than just the operation + //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table + throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(), + JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId()); + } + } + /* + Should we allow writing to non-transactional tables in an explicit transaction? The user may + issue ROLLBACK but these tables won't rollback. + Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing + any non acid and raise an appropriate error + * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/ + } + /** + * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)} * @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING * @return null if no locks were needed */ + @VisibleForTesting LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException { init(); - // Make sure we've built the lock manager + // Make sure we've built the lock manager getLockManager(); - + verifyState(plan); boolean atLeastOneLock = false; queryId = plan.getQueryId(); + switch (plan.getOperation()) { + case SET_AUTOCOMMIT: + /**This is here for documentation purposes. This TM doesn't support this - only has one + * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/ + return null; + } LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId); //link queryId to txnId @@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { // This is a file or something we don't hold locks for. continue; } - if(t != null && AcidUtils.isFullAcidTable(t)) { - compBuilder.setIsAcid(true); + if(t != null) { + compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t)); } LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } LockComponentBuilder compBuilder = new LockComponentBuilder(); Table t = null; + switch (output.getType()) { + case DATABASE: + compBuilder.setDbName(output.getDatabase().getName()); + break; + + case TABLE: + case DUMMYPARTITION: // in case of dynamic partitioning lock the table + t = output.getTable(); + compBuilder.setDbName(t.getDbName()); + compBuilder.setTableName(t.getTableName()); + break; + + case PARTITION: + compBuilder.setPartitionName(output.getPartition().getName()); + t = output.getPartition().getTable(); + compBuilder.setDbName(t.getDbName()); + compBuilder.setTableName(t.getTableName()); + break; + + default: + // This is a file or something we don't hold locks for. + continue; + } switch (output.getWriteType()) { + /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code... + Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think + makes sense everywhere). This however would be problematic for merge...*/ case DDL_EXCLUSIVE: case INSERT_OVERWRITE: compBuilder.setExclusive(); @@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl { break; case INSERT: - t = getTable(output); + assert t != null; if(AcidUtils.isFullAcidTable(t)) { compBuilder.setShared(); - compBuilder.setIsAcid(true); } else { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) { @@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl { } else { // this is backward compatible for non-ACID resources, w/o ACID semantics compBuilder.setShared(); } - compBuilder.setIsAcid(false); } compBuilder.setOperationType(DataOperationType.INSERT); break; @@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl { case UPDATE: compBuilder.setSemiShared(); compBuilder.setOperationType(DataOperationType.UPDATE); - t = getTable(output); break; case DELETE: compBuilder.setSemiShared(); compBuilder.setOperationType(DataOperationType.DELETE); - t = getTable(output); break; case DDL_NO_LOCK: @@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl { default: throw new RuntimeException("Unknown write type " + output.getWriteType().toString()); - } - switch (output.getType()) { - case DATABASE: - compBuilder.setDbName(output.getDatabase().getName()); - break; - - case TABLE: - case DUMMYPARTITION: // in case of dynamic partitioning lock the table - t = output.getTable(); - compBuilder.setDbName(t.getDbName()); - compBuilder.setTableName(t.getTableName()); - break; - - case PARTITION: - compBuilder.setPartitionName(output.getPartition().getName()); - t = output.getPartition().getTable(); - compBuilder.setDbName(t.getDbName()); - compBuilder.setTableName(t.getTableName()); - break; - - default: - // This is a file or something we don't hold locks for. - continue; - } - if(t != null && AcidUtils.isFullAcidTable(t)) { - compBuilder.setIsAcid(true); + if(t != null) { + compBuilder.setIsAcid(AcidUtils.isFullAcidTable(t)); } + compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite()); LockComponent comp = compBuilder.build(); LOG.debug("Adding lock component to lock request " + comp.toString()); @@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { e); } finally { txnId = 0; - statementId = -1; + writeId = -1; + numStatements = 0; } } @@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl { e); } finally { txnId = 0; - statementId = -1; + writeId = -1; + numStatements = 0; } } @@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl { public boolean supportsExplicitLock() { return false; } + @Override + public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException { + super.lockTable(db, lockTbl); + throw new UnsupportedOperationException(); + } + @Override + public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException { + super.unlockTable(hiveDB, unlockTbl); + throw new UnsupportedOperationException(); + } + @Override + public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException { + super.lockDatabase(hiveDB, lockDb); + throw new UnsupportedOperationException(); + } + @Override + public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException { + super.unlockDatabase(hiveDB, unlockDb); + throw new UnsupportedOperationException(); + } @Override public boolean useNewShowLocksFormat() { @@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl { public boolean supportsAcid() { return true; } - + /** + * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the + * start of the txn for Snapshot Isolation. For Read Committed (not supported yet) we'd record + * it before executing each statement (but after lock acquisition if using lock based concurrency + * control). + * For implicit txn, the stmt that triggered/started the txn is the first statement + */ + @Override + public boolean recordSnapshot(QueryPlan queryPlan) { + assert isTxnOpen(); + assert numStatements > 0 : "was acquireLocks() called already?"; + if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) { + //here if start of explicit txn + assert isExplicitTransaction; + assert numStatements == 1; + return true; + } + else if(!isExplicitTransaction) { + assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn"; + if (queryPlan.hasAcidResourcesInQuery()) { + //1st and only stmt in implicit txn and uses acid resource + return true; + } + } + return false; + } + @Override + public boolean isImplicitTransactionOpen() { + if(!isTxnOpen()) { + //some commands like "show databases" don't start implicit transactions + return false; + } + if(!isExplicitTransaction) { + assert numStatements == 1 : "numStatements=" + numStatements; + return true; + } + return false; + } @Override protected void destruct() { try { @@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl { @Override public int getWriteIdAndIncrement() { assert isTxnOpen(); - return statementId++; + return writeId++; } private static long getHeartbeatInterval(Configuration conf) throws LockException { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java index 53ee9c8..24df25b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java @@ -301,7 +301,8 @@ class DummyTxnManager extends HiveTxnManagerImpl { new HiveLockObject.HiveLockObjectData(plan.getQueryId(), String.valueOf(System.currentTimeMillis()), "IMPLICIT", - plan.getQueryStr()); + plan.getQueryStr(), + conf); if (db != null) { locks.add(new HiveLockObj(new HiveLockObject(db.getName(), lockData), http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java index fff03df..a514339 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.hive.common.StringInternUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -48,16 +49,23 @@ public class HiveLockObject { * Note: The parameters are used to uniquely identify a HiveLockObject. * The parameters will be stripped off any ':' characters in order not * to interfere with the way the data is serialized (':' delimited string). + * The query string might be truncated depending on HIVE_LOCK_QUERY_STRING_MAX_LENGTH + * @param queryId The query identifier will be added to the object without change + * @param lockTime The lock time will be added to the object without change + * @param lockMode The lock mode will be added to the object without change + * @param queryStr The query string might be truncated based on + * HIVE_LOCK_QUERY_STRING_MAX_LENGTH conf variable + * @param conf The hive configuration based on which we decide if we should truncate the query + * string or not */ - public HiveLockObjectData(String queryId, - String lockTime, - String lockMode, - String queryStr) { + public HiveLockObjectData(String queryId, String lockTime, String lockMode, String queryStr, + HiveConf conf) { this.queryId = removeDelimiter(queryId); this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime)); this.lockMode = removeDelimiter(lockMode); this.queryStr = StringInternUtils.internIfNotNull( - removeDelimiter(queryStr == null ? null : queryStr.trim())); + queryStr == null ? null : StringUtils.substring(removeDelimiter(queryStr.trim()), 0, + conf.getIntVar(HiveConf.ConfVars.HIVE_LOCK_QUERY_STRING_MAX_LENGTH))); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java index 187a658..b24351c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java @@ -72,7 +72,7 @@ public interface HiveTxnManager { /** * Acquire all of the locks needed by a query. If used with a query that - * requires transactions, this should be called after {@link #openTxn(String)}. + * requires transactions, this should be called after {@link #openTxn(Context, String)}. * A list of acquired locks will be stored in the * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}. @@ -208,17 +208,13 @@ public interface HiveTxnManager { boolean supportsAcid(); /** - * This behaves exactly as - * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean) - */ - void setAutoCommit(boolean autoCommit) throws LockException; - - /** - * This behaves exactly as - * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit() + * For resources that support MVCC, the state of the DB must be recorded for the duration of the + * operation/transaction. Returns {@code true} if current statment needs to do this. */ - boolean getAutoCommit(); + boolean recordSnapshot(QueryPlan queryPlan); + boolean isImplicitTransactionOpen(); + boolean isTxnOpen(); /** * if {@code isTxnOpen()}, returns the currently active transaction ID http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java index a371a5a..8dbbf87 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; abstract class HiveTxnManagerImpl implements HiveTxnManager { protected HiveConf conf; - private boolean isAutoCommit = true;//true by default; matches JDBC spec void setHiveConf(HiveConf c) { conf = c; @@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { destruct(); } @Override - public void setAutoCommit(boolean autoCommit) throws LockException { - isAutoCommit = autoCommit; - } - - @Override - public boolean getAutoCommit() { - return isAutoCommit; - } - - @Override public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException { HiveLockManager lockMgr = getAndCheckLockManager(); @@ -93,7 +82,8 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { new HiveLockObjectData(lockTbl.getQueryId(), String.valueOf(System.currentTimeMillis()), "EXPLICIT", - lockTbl.getQueryStr()); + lockTbl.getQueryStr(), + conf); if (partSpec == null) { HiveLock lck = lockMgr.lock(new HiveLockObject(tbl, lockData), mode, true); @@ -151,7 +141,7 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { HiveLockObjectData lockData = new HiveLockObjectData(lockDb.getQueryId(), String.valueOf(System.currentTimeMillis()), - "EXPLICIT", lockDb.getQueryStr()); + "EXPLICIT", lockDb.getQueryStr(), conf); HiveLock lck = lockMgr.lock(new HiveLockObject(dbObj.getName(), lockData), mode, true); if (lck == null) { @@ -202,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager { return lockMgr; } + @Override + public boolean recordSnapshot(QueryPlan queryPlan) { + return false; + } + @Override + public boolean isImplicitTransactionOpen() { + return true; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java index c2a4806..9b46ae7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/zookeeper/ZooKeeperHiveLockManager.java @@ -285,8 +285,10 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { int tryNum = 0; ZooKeeperHiveLock ret = null; Set<String> conflictingLocks = new HashSet<String>(); + Exception lastException = null; do { + lastException = null; tryNum++; try { if (tryNum > 1) { @@ -298,26 +300,22 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { break; } } catch (Exception e1) { + lastException = e1; if (e1 instanceof KeeperException) { KeeperException e = (KeeperException) e1; switch (e.code()) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: + case NONODE: + case NODEEXISTS: LOG.debug("Possibly transient ZooKeeper exception: ", e); - continue; + break; default: LOG.error("Serious Zookeeper exception: ", e); break; } - } - if (tryNum >= numRetriesForLock) { - console.printError("Unable to acquire " + key.getData().getLockMode() - + ", " + mode + " lock " + key.getDisplayName() + " after " - + tryNum + " attempts."); - LOG.error("Exceeds maximum retries with errors: ", e1); - printConflictingLocks(key,mode,conflictingLocks); - conflictingLocks.clear(); - throw new LockException(e1); + } else { + LOG.error("Other unexpected exception: ", e1); } } } while (tryNum < numRetriesForLock); @@ -327,8 +325,11 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { + ", " + mode + " lock " + key.getDisplayName() + " after " + tryNum + " attempts."); printConflictingLocks(key,mode,conflictingLocks); + if (lastException != null) { + LOG.error("Exceeds maximum retries with errors: ", lastException); + throw new LockException(lastException); + } } - conflictingLocks.clear(); return ret; } @@ -350,6 +351,19 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { } } + /** + * Creates a primitive lock object on ZooKeeper. + * @param key The lock data + * @param mode The lock mode (HiveLockMode - EXCLUSIVE/SHARED/SEMI_SHARED) + * @param keepAlive If true creating PERSISTENT ZooKeeper locks, otherwise EPHEMERAL ZooKeeper + * locks + * @param parentCreated If we expect, that the parent is already created then true, otherwise + * we will try to create the parents as well + * @param conflictingLocks The set where we should collect the conflicting locks when + * the logging level is set to DEBUG + * @return The created ZooKeeperHiveLock object, null if there was a conflicting lock + * @throws Exception If there was an unexpected Exception + */ private ZooKeeperHiveLock lockPrimitive(HiveLockObject key, HiveLockMode mode, boolean keepAlive, boolean parentCreated, Set<String> conflictingLocks) @@ -390,7 +404,7 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { int seqNo = getSequenceNumber(res, getLockName(lastName, mode)); if (seqNo == -1) { curatorFramework.delete().forPath(res); - return null; + throw new LockException("The created node does not contain a sequence number: " + res); } List<String> children = curatorFramework.getChildren().forPath(lastName); @@ -584,7 +598,6 @@ public class ZooKeeperHiveLockManager implements HiveLockManager { /** * @param conf Hive configuration - * @param zkpClient The ZooKeeper client * @param key The object to be compared against - if key is null, then get all locks **/ private static List<HiveLock> getLocks(HiveConf conf, http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java new file mode 100644 index 0000000..64ce100 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.log; + +import java.util.regex.Pattern; + +import org.apache.hadoop.hive.common.LogUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.OperationLog; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.appender.RandomAccessFileAppender; +import org.apache.logging.log4j.core.appender.routing.Route; +import org.apache.logging.log4j.core.appender.routing.Routes; +import org.apache.logging.log4j.core.appender.routing.RoutingAppender; +import org.apache.logging.log4j.core.config.Configuration; +import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.logging.log4j.core.config.Node; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.core.config.plugins.processor.PluginEntry; +import org.apache.logging.log4j.core.config.plugins.util.PluginType; +import org.apache.logging.log4j.core.filter.AbstractFilter; +import org.apache.logging.log4j.core.layout.PatternLayout; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; + +/** + * Divert appender to redirect operation logs to separate files. + */ +public class LogDivertAppender { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogDivertAppender.class.getName()); + public static final String verboseLayout = "%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n"; + public static final String nonVerboseLayout = "%-5p : %m%n"; + + /** + * A log filter that filters messages coming from the logger with the given names. + * It be used as a white list filter or a black list filter. + * We apply black list filter on the Loggers used by the log diversion stuff, so that + * they don't generate more logs for themselves when they process logs. + * White list filter is used for less verbose log collection + */ + @Plugin(name = "NameFilter", category = "Core", elementType="filter", printObject = true) + private static class NameFilter extends AbstractFilter { + private Pattern namePattern; + private OperationLog.LoggingLevel loggingMode; + + /* Patterns that are excluded in verbose logging level. + * Filter out messages coming from log processing classes, or we'll run an infinite loop. + */ + private static final Pattern verboseExcludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[]{LOG.getName(), OperationLog.class.getName()})); + + /* Patterns that are included in execution logging level. + * In execution mode, show only select logger messages. + */ + private static final Pattern executionIncludeNamePattern = Pattern.compile(Joiner.on("|"). + join(new String[]{"org.apache.hadoop.mapreduce.JobSubmitter", + "org.apache.hadoop.mapreduce.Job", "SessionState", Task.class.getName(), + Driver.class.getName(), "org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor"})); + + /* Patterns that are included in performance logging level. + * In performance mode, show execution and performance logger messages. + */ + private static final Pattern performanceIncludeNamePattern = Pattern.compile( + executionIncludeNamePattern.pattern() + "|" + PerfLogger.class.getName()); + + private void setCurrentNamePattern(OperationLog.LoggingLevel mode) { + if (mode == OperationLog.LoggingLevel.VERBOSE) { + this.namePattern = verboseExcludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.EXECUTION) { + this.namePattern = executionIncludeNamePattern; + } else if (mode == OperationLog.LoggingLevel.PERFORMANCE) { + this.namePattern = performanceIncludeNamePattern; + } + } + + public NameFilter(OperationLog.LoggingLevel loggingMode) { + this.loggingMode = loggingMode; + setCurrentNamePattern(loggingMode); + } + + @Override + public Result filter(LogEvent event) { + boolean excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + + String logLevel = event.getContextMap().get(LogUtils.OPERATIONLOG_LEVEL_KEY); + logLevel = logLevel == null ? "" : logLevel; + OperationLog.LoggingLevel currentLoggingMode = OperationLog.getLoggingLevel(logLevel); + // If logging is disabled, deny everything. + if (currentLoggingMode == OperationLog.LoggingLevel.NONE) { + return Result.DENY; + } + // Look at the current session's setting + // and set the pattern and excludeMatches accordingly. + if (currentLoggingMode != loggingMode) { + loggingMode = currentLoggingMode; + excludeMatches = (loggingMode == OperationLog.LoggingLevel.VERBOSE); + setCurrentNamePattern(loggingMode); + } + + boolean isMatch = namePattern.matcher(event.getLoggerName()).matches(); + + if (excludeMatches == isMatch) { + // Deny if this is black-list filter (excludeMatches = true) and it + // matched or if this is whitelist filter and it didn't match + return Result.DENY; + } + + return Result.NEUTRAL; + } + + @PluginFactory + public static NameFilter createFilter( + @PluginAttribute("loggingLevel") final String loggingLevel) { + // Name required for routing. Error out if it is not set. + Preconditions.checkNotNull(loggingLevel, + "loggingLevel must be specified for " + NameFilter.class.getName()); + + return new NameFilter(OperationLog.getLoggingLevel(loggingLevel)); + } + } + + /** + * Programmatically register a routing appender to Log4J configuration, which + * automatically writes the log of each query to an individual file. + * The equivilent property configuration is as follows: + * # queryId based routing file appender + appender.query-routing.type = Routing + appender.query-routing.name = query-routing + appender.query-routing.routes.type = Routes + appender.query-routing.routes.pattern = $${ctx:queryId} + # default route + appender.query-routing.routes.route-default.type = Route + appender.query-routing.routes.route-default.key = $${ctx:queryId} + appender.query-routing.routes.route-default.app.type = null + appender.query-routing.routes.route-default.app.name = Null + # queryId based route + appender.query-routing.routes.route-mdc.type = Route + appender.query-routing.routes.route-mdc.name = IrrelevantName-query-routing + appender.query-routing.routes.route-mdc.app.type = RandomAccessFile + appender.query-routing.routes.route-mdc.app.name = query-file-appender + appender.query-routing.routes.route-mdc.app.fileName = ${sys:hive.log.dir}/${ctx:sessionId}/${ctx:queryId} + appender.query-routing.routes.route-mdc.app.layout.type = PatternLayout + appender.query-routing.routes.route-mdc.app.layout.pattern = %d{ISO8601} %5p %c{2}: %m%n + * @param conf the configuration for HiveServer2 instance + */ + public static void registerRoutingAppender(org.apache.hadoop.conf.Configuration conf) { + String loggingLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL); + OperationLog.LoggingLevel loggingMode = OperationLog.getLoggingLevel(loggingLevel); + String layout = loggingMode == OperationLog.LoggingLevel.VERBOSE ? verboseLayout : nonVerboseLayout; + String logLocation = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION); + + // Create NullAppender + PluginEntry nullEntry = new PluginEntry(); + nullEntry.setClassName(NullAppender.class.getName()); + nullEntry.setKey("null"); + nullEntry.setName("appender"); + PluginType<NullAppender> nullChildType = new PluginType<NullAppender>(nullEntry, NullAppender.class, "appender"); + Node nullChildNode = new Node(null, "Null", nullChildType); + + // Create default route + PluginEntry defaultEntry = new PluginEntry(); + defaultEntry.setClassName(Route.class.getName()); + defaultEntry.setKey("route"); + defaultEntry.setName("Route"); + PluginType<Route> defaultType = new PluginType<Route>(defaultEntry, Route.class, "Route"); + Node nullNode = new Node(null, "Route", defaultType); + nullNode.getChildren().add(nullChildNode); + Route defaultRoute = Route.createRoute(null, "${ctx:queryId}", nullNode); + + // Create queryId based route + PluginEntry entry = new PluginEntry(); + entry.setClassName(Route.class.getName()); + entry.setKey("route"); + entry.setName("Route"); + PluginType<Route> type = new PluginType<Route>(entry, Route.class, "Route"); + Node node = new Node(null, "Route", type); + + PluginEntry childEntry = new PluginEntry(); + childEntry.setClassName(RandomAccessFileAppender.class.getName()); + childEntry.setKey("randomaccessfile"); + childEntry.setName("appender"); + PluginType<RandomAccessFileAppender> childType = new PluginType<RandomAccessFileAppender>(childEntry, RandomAccessFileAppender.class, "appender"); + Node childNode = new Node(node, "RandomAccessFile", childType); + childNode.getAttributes().put("name", "query-file-appender"); + childNode.getAttributes().put("fileName", logLocation + "/${ctx:sessionId}/${ctx:queryId}"); + node.getChildren().add(childNode); + + PluginEntry filterEntry = new PluginEntry(); + filterEntry.setClassName(NameFilter.class.getName()); + filterEntry.setKey("namefilter"); + filterEntry.setName("namefilter"); + PluginType<NameFilter> filterType = new PluginType<NameFilter>(filterEntry, NameFilter.class, "filter"); + Node filterNode = new Node(childNode, "NameFilter", filterType); + filterNode.getAttributes().put("loggingLevel", loggingMode.name()); + childNode.getChildren().add(filterNode); + + PluginEntry layoutEntry = new PluginEntry(); + layoutEntry.setClassName(PatternLayout.class.getName()); + layoutEntry.setKey("patternlayout"); + layoutEntry.setName("layout"); + PluginType<PatternLayout> layoutType = new PluginType<PatternLayout>(layoutEntry, PatternLayout.class, "layout"); + Node layoutNode = new Node(childNode, "PatternLayout", layoutType); + layoutNode.getAttributes().put("pattern", layout); + childNode.getChildren().add(layoutNode); + + Route mdcRoute = Route.createRoute(null, null, node); + Routes routes = Routes.createRoutes("${ctx:queryId}", defaultRoute, mdcRoute); + + LoggerContext context = (LoggerContext) LogManager.getContext(false); + Configuration configuration = context.getConfiguration(); + + RoutingAppender routingAppender = RoutingAppender.createAppender("query-routing", + "true", + routes, + configuration, + null, + null, + null); + + LoggerConfig loggerConfig = configuration.getRootLogger(); + loggerConfig.addAppender(routingAppender, null, null); + context.updateLoggers(); + routingAppender.start(); + } +}