This is an automated email from the ASF dual-hosted git repository.

prozsa pushed a commit to branch branch-4.5.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 48e50184e3c040ef46b0af88323dd6ce36bd9215
Author: Csaba Ringhofer <csringho...@cloudera.com>
AuthorDate: Wed Feb 19 13:32:56 2025 +0100

    IMPALA-13759: Fix Hive ACID INSERT OVERWRITE base detection
    
    Base directory created by INSERT OVERWRITE / TRUNCATE should be
    treated differently than bases created by compaction because
    IOW/TRUNCATE bases must be accepted even if there is an earlier
    open writeId. This scenario can easily occur if there is
    a pending write to a single partition, as this doesn't block
    an IOW/TRUNCATE to another partition, while the global
    minOpenWrite affects whether the base is accepted.
    
    This change updates Impala logic to consider these bases
    valid similarly to Hive.
    
    Note that differentiating IOW/TRUNCATE from compaction is
    different than in Hive, as metadata files are not considered
    in Impala (IMPALA-13769). This can only cause problems when
    interacting with earlier Hive versions that did not use
    visibilityTxnId in the base path. I don't consider this
    to be a significant regression that should block the current
    critical fix.
    
    Testing:
    - added regression EE/FE tests
    
    Change-Id: I838eaf4f41bae148e558f64288a1370c0908efa4
    Reviewed-on: http://gerrit.cloudera.org:8080/22499
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../java/org/apache/impala/util/AcidUtils.java     | 20 +++++++++++++++--
 .../java/org/apache/impala/util/AcidUtilsTest.java | 18 ++++++++++++++++
 tests/query_test/test_acid.py                      | 25 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java 
b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index c8efe788b..ca767b899 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -257,9 +257,25 @@ public class AcidUtils {
 
     public boolean check(String dirPath) throws CatalogException {
       ParsedBase parsedBase = parseBase(dirPath);
+      // The logic below is not completely in sync with the way Hive works 
currently:
+      // 
https://github.com/apache/hive/blob/0759352ddddc793c0e717c460f0e08eb3f14c1e9/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java#L1774-L1797
+      // The main difference is that metadata files are ignored, so Impala 
cannot
+      // differentiate between bases created by INSERT OVERWRITE / TRUNCATE 
and compacted
+      // bases created by older Hives (that did not use visibiliyTxnId). See 
IMPALA-13759
+      // for more info.
       if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) {
-        boolean isValid = writeIdList.isValidBase(parsedBase.writeId) &&
-               isTxnValid(parsedBase.visibilityTxnId);
+        boolean isValid = false;
+        if (parsedBase.visibilityTxnId != -1) {
+          // Assume that the base is produced by a compaction.
+          isValid = writeIdList.isValidBase(parsedBase.writeId) &&
+              isTxnValid(parsedBase.visibilityTxnId);
+        } else {
+          // Assume that the base is produced by INSERT OVERWRITE or TRUNCATE. 
It is
+          // possible for isValidBase() to fail in this case, for example if 
there is
+          // an earlier open writeId in a different partition, so only check 
whether
+          // the writeId is committed.
+          isValid = writeIdList.isWriteIdValid(parsedBase.writeId);
+        }
         if (doStrictCheck && !isValid) {
           throw new CatalogException("Invalid base file found " + dirPath);
         }
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java 
b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index 6a17b82d2..04644bf94 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -597,6 +597,24 @@ public class AcidUtilsTest {
             "delta_0000012_0000012_0000/0000_1"});
   }
 
+  @Test
+  public void testOpenWriteIdBeforeInsertOverwrite() {
+    assertFiltering(new String[]{
+            "base_000001/",
+            "base_000001/0000_0",
+            "delta_000002_000002_0000/",
+            "delta_000002_000002_0000/0000",
+            "base_000003/",
+            "base_000003/0000_0"},
+        // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
+        "default.test:3:2:1:",
+        new String[]{
+            // As there is no visibilityTxnId, it is assumed that base_000003 
was produced
+            // by an insert overwrite and is accepted even though there is an 
earlier open
+            // write id (regression test for IMPALA-13759).
+            "base_000003/0000_0"});
+  }
+
   @Test
   public void testWriteIdListCompare() {
     ValidWriteIdList a =
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index 98931119f..860aee168 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -231,6 +231,12 @@ class TestAcid(ImpalaTestSuite):
     commit_req.txnid = txn_id
     return self.hive_client.commit_txn(commit_req)
 
+  def _create_insert_only_acid_table(self, tbl, cols, part_cols=None):
+    part_part = ("partitioned by (%s)" % part_cols) if part_cols else ""
+    self.execute_query("create table {} ({}) {} tblproperties"
+        "('transactional'='true',"
+        "'transactional_properties'='insert_only')".format(tbl, cols, 
part_part))
+
   @SkipIfFS.hive
   def test_lock_timings(self, vector, unique_database):
     def elapsed_time_for_query(query):
@@ -360,3 +366,22 @@ class TestAcid(ImpalaTestSuite):
     # Create a new table and load it in catalogd. Catalogd should not hang.
     self.client.execute("CREATE TABLE {0}.tbl (i int)".format(unique_database))
     self.client.execute("DESCRIBE {0}.tbl".format(unique_database))
+
+  def test_insert_overwrite_base_detection(self, unique_database):
+    """"Regression test for IMPALA-13759. Checks that the base directory 
created by
+    INSERT OVERWRITE is detected correctly even if there is an earlier open 
writeId
+    in another partition."""
+    tbl = unique_database + ".insert_only_table"
+    self._create_insert_only_acid_table(tbl, "i int", "p string")
+    sleep_time_ms = 15000
+    other_partition_insert_handle = self.execute_query_async(
+        'insert into %s partition (p="a") values (sleep(%d))' % (tbl, 
sleep_time_ms))
+    self.execute_query('insert into %s partition (p="b") values (2)' % tbl)
+    self.execute_query('insert overwrite %s partition (p="b") values (3)' % 
tbl)
+    self.execute_query('insert into %s partition (p="b") values (4)' % tbl)
+    result = self.execute_query("select * from %s order by i" % tbl)
+    assert result.data == ['3\tb', '4\tb']
+    self.client.wait_for_finished_timeout(
+        other_partition_insert_handle, 0.001 * sleep_time_ms)
+    result = self.execute_query("select * from %s order by i" % tbl)
+    assert result.data == ['1\ta', '3\tb', '4\tb']

Reply via email to