This is an automated email from the ASF dual-hosted git repository. szita pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new cfe90d5 HIVE-9995: ACID compaction tries to compact a single file (Denys Kuzmenko, reviewed by Peter Vary) cfe90d5 is described below commit cfe90d57d9a9c3d3c2f26fcbd7254e040833362e Author: Denys Kuzmenko <dkuzme...@cloudera.com> AuthorDate: Tue Apr 9 10:53:12 2019 +0200 HIVE-9995: ACID compaction tries to compact a single file (Denys Kuzmenko, reviewed by Peter Vary) --- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 92 ++++++++++++---------- .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 4 +- .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 2 +- .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 88 +++++++++++++-------- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +- .../hadoop/hive/ql/txn/compactor/TestWorker.java | 18 ++--- 6 files changed, 116 insertions(+), 90 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index af8743d..24fc0d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -723,7 +723,10 @@ public class AcidUtils { } } - public static interface Directory { + /** + * Interface used to provide ACID directory information. + */ + public interface Directory { /** * Get the base directory. @@ -1080,12 +1083,15 @@ public class AcidUtils { /** * Is the given directory in ACID format? - * @param fileSystem file system instance * @param directory the partition directory to check * @param conf the query configuration * @return true, if it is an ACID directory * @throws IOException */ + public static boolean isAcid(Path directory, Configuration conf) throws IOException { + return isAcid(null, directory, conf); + } + public static boolean isAcid(FileSystem fileSystem, Path directory, Configuration conf) throws IOException { FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem; @@ -1105,9 +1111,8 @@ public class AcidUtils { @VisibleForTesting public static Directory getAcidState(Path directory, Configuration conf, - ValidWriteIdList writeIdList - ) throws IOException { - return getAcidState(null, directory, conf, writeIdList, false, false); + ValidWriteIdList writeIdList) throws IOException { + return getAcidState(directory, conf, writeIdList, false, false); } /** State class for getChildState; cannot modify 2 things in a method. */ @@ -1123,24 +1128,35 @@ public class AcidUtils { * base and diff directories. Note that because major compactions don't * preserve the history, we can't use a base directory that includes a * write id that we must exclude. - * @param fileSystem file system instance * @param directory the partition directory to analyze * @param conf the configuration * @param writeIdList the list of write ids that we are reading * @return the state of the directory * @throws IOException */ - public static Directory getAcidState(FileSystem fileSystem, Path directory, - Configuration conf, + public static Directory getAcidState(Path directory, Configuration conf, + ValidWriteIdList writeIdList, + boolean useFileIds, + boolean ignoreEmptyFiles) throws IOException { + return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); + } + + public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, ValidWriteIdList writeIdList, boolean useFileIds, - boolean ignoreEmptyFiles - ) throws IOException { + boolean ignoreEmptyFiles) throws IOException { return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null); } - public static Directory getAcidState(FileSystem fileSystem, Path directory, - Configuration conf, + public static Directory getAcidState(Path directory, Configuration conf, + ValidWriteIdList writeIdList, + Ref<Boolean> useFileIds, + boolean ignoreEmptyFiles, + Map<String, String> tblproperties) throws IOException { + return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties); + } + + public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf, ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, @@ -1169,21 +1185,8 @@ public class AcidUtils { List<Path> originalDirectories = new ArrayList<>(); final List<Path> obsolete = new ArrayList<>(); final List<Path> abortedDirectories = new ArrayList<>(); - List<HdfsFileStatusWithId> childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory); + TxnBase bestBase = new TxnBase(); final List<HdfsFileStatusWithId> original = new ArrayList<>(); if (childrenWithId != null) { @@ -1452,21 +1455,7 @@ public class AcidUtils { public static void findOriginals(FileSystem fs, Path dir, List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, boolean recursive) throws IOException { - List<HdfsFileStatusWithId> childrenWithId = null; - Boolean val = useFileIds.value; - if (val == null || val) { - try { - childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter); - if (val == null) { - useFileIds.value = true; - } - } catch (Throwable t) { - LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); - if (val == null && t instanceof UnsupportedOperationException) { - useFileIds.value = false; - } - } - } + List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir); if (childrenWithId != null) { for (HdfsFileStatusWithId child : childrenWithId) { if (child.getFileStatus().isDirectory()) { @@ -1496,6 +1485,25 @@ public class AcidUtils { } } + private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds, + FileSystem fs, Path directory) { + Boolean val = useFileIds.value; + List<HdfsFileStatusWithId> childrenWithId = null; + if (val == null || val) { + try { + childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter); + if (val == null) { + useFileIds.value = true; + } + } catch (Throwable t) { + LOG.error("Failed to get files with ID; using regular API: " + t.getMessage()); + if (val == null && t instanceof UnsupportedOperationException) { + useFileIds.value = false; + } + } + } + return childrenWithId; + } public static boolean isTablePropertyTransactional(Properties props) { String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index fbbddba..b1ede05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -463,7 +463,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ */ //the split is from something other than the 1st file of the logical bucket - compute offset AcidUtils.Directory directoryState - = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false, + = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); @@ -578,7 +578,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ assert options.getOffset() == 0; assert options.getMaxOffset() == Long.MAX_VALUE; AcidUtils.Directory directoryState - = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false, true); + = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true); /** * Note that for reading base_x/ or delta_x_x/ with non-acid schema, * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java index 18c35f2..15f1f94 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java @@ -722,7 +722,7 @@ public class VectorizedOrcAcidRowBatchReader int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf) //statementId is from directory name (or 0 if there is none) .statementId(syntheticTxnInfo.statementId).bucket(bucketId)); - AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf, + AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf, validWriteIdList, false, true); for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) { int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index e5f3047..aa12ddb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.regex.Matcher; +import java.util.stream.Collectors; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; @@ -266,10 +267,16 @@ public class CompactorMR { // and discovering that in getSplits is too late as we then have no way to pass it to our // mapper. - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, false, true); + AcidUtils.Directory dir = AcidUtils.getAcidState( + new Path(sd.getLocation()), conf, writeIds, false, true); + + if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) { + return; + } + List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories(); - int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); - if(parsedDeltas.size() > maxDeltastoHandle) { + int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA); + if (parsedDeltas.size() > maxDeltasToHandle) { /** * if here, that means we have very high number of delta files. This may be sign of a temporary * glitch or a real issue. For example, if transaction batch size or transaction size is set too @@ -282,13 +289,13 @@ public class CompactorMR { + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " + "especially if this message repeats. Check that compaction is running properly. Check for any " + "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API."); - int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle; - for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { + int numMinorCompactions = parsedDeltas.size() / maxDeltasToHandle; + for (int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) { JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci); launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, - parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle), - maxDeltastoHandle, -1, conf, msc, ci.id, jobName); + parsedDeltas.subList(jobSubId * maxDeltasToHandle, (jobSubId + 1) * maxDeltasToHandle), + maxDeltasToHandle, -1, conf, msc, ci.id, jobName); } //now recompute state since we've done minor compactions and have different 'best' set of deltas dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds); @@ -319,17 +326,6 @@ public class CompactorMR { dirsToSearch.add(baseDir); } } - if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) { - // Skip compaction if there's no delta files AND there's no original files - String minOpenInfo = "."; - if(writeIds.getMinOpenWriteId() != null) { - minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) + - ". Compaction cannot compact above this writeId"; - } - LOG.error("No delta files or original files found to compact in " + sd.getLocation() + - " for compactionId=" + ci.id + minOpenInfo); - return; - } launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName); @@ -345,16 +341,14 @@ public class CompactorMR { private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters())); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(false), false, t.getParameters()); - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir - .getBaseDirectory(), deltaCount, origCount); + + if (!isEnoughToCompact(dir, sd)) { return; } + String user = UserGroupInformation.getCurrentUser().getShortUserName(); SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true); // Set up the session for driver. @@ -412,11 +406,47 @@ public class CompactorMR { } } + private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) { + return isEnoughToCompact(true, dir, sd); + } + + private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) { + int deltaCount = dir.getCurrentDirectories().size(); + int origCount = dir.getOriginalFiles().size(); + + StringBuilder deltaInfo = new StringBuilder().append(deltaCount); + boolean isEnoughToCompact; + + if (isMajorCompaction) { + isEnoughToCompact = (origCount > 0 + || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1); + + } else { + isEnoughToCompact = (deltaCount > 1); + + if (deltaCount == 2) { + Map<String, Long> deltaByType = dir.getCurrentDirectories().stream() + .collect(Collectors.groupingBy(delta -> + (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX), + Collectors.counting())); + + isEnoughToCompact = (deltaByType.size() != deltaCount); + deltaInfo.append(" ").append(deltaByType); + } + } + + if (!isEnoughToCompact) { + LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", + sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount); + } + return isEnoughToCompact; + } + private void runMmCompaction(HiveConf conf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException { LOG.debug("Going to delete directories for aborted transactions for MM table " + t.getDbName() + "." + t.getTableName()); - AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), + AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, t.getParameters()); removeFilesForMmTable(conf, dir); @@ -427,14 +457,10 @@ public class CompactorMR { return; } - int deltaCount = dir.getCurrentDirectories().size(); - int origCount = dir.getOriginalFiles().size(); - if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) { - LOG.debug("Not compacting " + sd.getLocation() + "; current base is " - + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and " - + origCount + " originals"); + if (!isEnoughToCompact(dir, sd)) { return; } + try { String tmpLocation = generateTmpPath(sd); Path baseLocation = new Path(tmpLocation, "_base"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 0734ed9..d4abf42 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -953,7 +953,7 @@ public class TestTxnCommands2 { public void testEmptyInTblproperties() throws Exception { runStatementOnDriver("create table t1 " + "(a int, b int) stored as orc TBLPROPERTIES ('serialization.null.format'='', 'transactional'='true')"); runStatementOnDriver("insert into t1 " + "(a,b) values(1,7),(3,7)"); - runStatementOnDriver("update t1" + " set b = -2 where b = 2"); + runStatementOnDriver("update t1" + " set b = -2 where a = 1"); runStatementOnDriver("alter table t1 " + " compact 'MAJOR'"); runWorker(hiveConf); TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index d9e4468..553addb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -351,25 +351,17 @@ public class TestWorker extends CompactorTest { Assert.assertEquals(1, compacts.size()); Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); - // There should still now be 5 directories in the location + // There should still be 4 directories in the location FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation())); - Assert.assertEquals(toString(stat),6 , stat.length); + Assert.assertEquals(toString(stat), 4, stat.length); // Find the new delta file and make sure it has the right contents Arrays.sort(stat); Assert.assertEquals("base_20", stat[0].getPath().getName()); - /** - * this may look a bit odd. Compactor is capped at min open write id which is 23 in this case - * so the minor compaction above only 1 dir as input, delta_21_22 and outputs - * delta_21_22_v28 (and matching delete_delta) (HIVE-9995/HIVE-20901) - */ - Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22) + "_v0000028", - stat[1].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName()); - Assert.assertEquals(makeDeltaDirNameCompacted(21, 22) + "_v0000028", stat[3].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName()); - Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName()); + Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName()); } @Test