This is an automated email from the ASF dual-hosted git repository. prozsa pushed a commit to branch branch-4.5.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 48e50184e3c040ef46b0af88323dd6ce36bd9215 Author: Csaba Ringhofer <csringho...@cloudera.com> AuthorDate: Wed Feb 19 13:32:56 2025 +0100 IMPALA-13759: Fix Hive ACID INSERT OVERWRITE base detection Base directory created by INSERT OVERWRITE / TRUNCATE should be treated differently than bases created by compaction because IOW/TRUNCATE bases must be accepted even if there is an earlier open writeId. This scenario can easily occur if there is a pending write to a single partition, as this doesn't block an IOW/TRUNCATE to another partition, while the global minOpenWrite affects whether the base is accepted. This change updates Impala logic to consider these bases valid similarly to Hive. Note that differentiating IOW/TRUNCATE from compaction is different than in Hive, as metadata files are not considered in Impala (IMPALA-13769). This can only cause problems when interacting with earlier Hive versions that did not use visibilityTxnId in the base path. I don't consider this to be a significant regression that should block the current critical fix. Testing: - added regression EE/FE tests Change-Id: I838eaf4f41bae148e558f64288a1370c0908efa4 Reviewed-on: http://gerrit.cloudera.org:8080/22499 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- .../java/org/apache/impala/util/AcidUtils.java | 20 +++++++++++++++-- .../java/org/apache/impala/util/AcidUtilsTest.java | 18 ++++++++++++++++ tests/query_test/test_acid.py | 25 ++++++++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java index c8efe788b..ca767b899 100644 --- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java +++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java @@ -257,9 +257,25 @@ public class AcidUtils { public boolean check(String dirPath) throws CatalogException { ParsedBase parsedBase = parseBase(dirPath); + // The logic below is not completely in sync with the way Hive works currently: + // https://github.com/apache/hive/blob/0759352ddddc793c0e717c460f0e08eb3f14c1e9/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#L1774-L1797 + // The main difference is that metadata files are ignored, so Impala cannot + // differentiate between bases created by INSERT OVERWRITE / TRUNCATE and compacted + // bases created by older Hives (that did not use visibiliyTxnId). See IMPALA-13759 + // for more info. if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) { - boolean isValid = writeIdList.isValidBase(parsedBase.writeId) && - isTxnValid(parsedBase.visibilityTxnId); + boolean isValid = false; + if (parsedBase.visibilityTxnId != -1) { + // Assume that the base is produced by a compaction. + isValid = writeIdList.isValidBase(parsedBase.writeId) && + isTxnValid(parsedBase.visibilityTxnId); + } else { + // Assume that the base is produced by INSERT OVERWRITE or TRUNCATE. It is + // possible for isValidBase() to fail in this case, for example if there is + // an earlier open writeId in a different partition, so only check whether + // the writeId is committed. + isValid = writeIdList.isWriteIdValid(parsedBase.writeId); + } if (doStrictCheck && !isValid) { throw new CatalogException("Invalid base file found " + dirPath); } diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java index 6a17b82d2..04644bf94 100644 --- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java +++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java @@ -597,6 +597,24 @@ public class AcidUtilsTest { "delta_0000012_0000012_0000/0000_1"}); } + @Test + public void testOpenWriteIdBeforeInsertOverwrite() { + assertFiltering(new String[]{ + "base_000001/", + "base_000001/0000_0", + "delta_000002_000002_0000/", + "delta_000002_000002_0000/0000", + "base_000003/", + "base_000003/0000_0"}, + // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds> + "default.test:3:2:1:", + new String[]{ + // As there is no visibilityTxnId, it is assumed that base_000003 was produced + // by an insert overwrite and is accepted even though there is an earlier open + // write id (regression test for IMPALA-13759). + "base_000003/0000_0"}); + } + @Test public void testWriteIdListCompare() { ValidWriteIdList a = diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py index 98931119f..860aee168 100644 --- a/tests/query_test/test_acid.py +++ b/tests/query_test/test_acid.py @@ -231,6 +231,12 @@ class TestAcid(ImpalaTestSuite): commit_req.txnid = txn_id return self.hive_client.commit_txn(commit_req) + def _create_insert_only_acid_table(self, tbl, cols, part_cols=None): + part_part = ("partitioned by (%s)" % part_cols) if part_cols else "" + self.execute_query("create table {} ({}) {} tblproperties" + "('transactional'='true'," + "'transactional_properties'='insert_only')".format(tbl, cols, part_part)) + @SkipIfFS.hive def test_lock_timings(self, vector, unique_database): def elapsed_time_for_query(query): @@ -360,3 +366,22 @@ class TestAcid(ImpalaTestSuite): # Create a new table and load it in catalogd. Catalogd should not hang. self.client.execute("CREATE TABLE {0}.tbl (i int)".format(unique_database)) self.client.execute("DESCRIBE {0}.tbl".format(unique_database)) + + def test_insert_overwrite_base_detection(self, unique_database): + """"Regression test for IMPALA-13759. Checks that the base directory created by + INSERT OVERWRITE is detected correctly even if there is an earlier open writeId + in another partition.""" + tbl = unique_database + ".insert_only_table" + self._create_insert_only_acid_table(tbl, "i int", "p string") + sleep_time_ms = 15000 + other_partition_insert_handle = self.execute_query_async( + 'insert into %s partition (p="a") values (sleep(%d))' % (tbl, sleep_time_ms)) + self.execute_query('insert into %s partition (p="b") values (2)' % tbl) + self.execute_query('insert overwrite %s partition (p="b") values (3)' % tbl) + self.execute_query('insert into %s partition (p="b") values (4)' % tbl) + result = self.execute_query("select * from %s order by i" % tbl) + assert result.data == ['3\tb', '4\tb'] + self.client.wait_for_finished_timeout( + other_partition_insert_handle, 0.001 * sleep_time_ms) + result = self.execute_query("select * from %s order by i" % tbl) + assert result.data == ['1\ta', '3\tb', '4\tb']