This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 597dc69a85e HIVE-27388: Backport of HIVE-23058: Compaction task
reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by
Laszlo Pinter)
597dc69a85e is described below
commit 597dc69a85ec487983a2b12af8e29d24fc61ff04
Author: Diksha628 <[email protected]>
AuthorDate: Tue Sep 12 12:34:06 2023 +0530
HIVE-27388: Backport of HIVE-23058: Compaction task reattempt fails with
FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
Signed-off-by: Sankar Hariappan <[email protected]>
Closes (#4659)
---
.../hive/ql/txn/compactor/TestCompactor.java | 60 +++++++++++++++++++---
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 13 ++++-
2 files changed, 64 insertions(+), 9 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 0827bcdb695..c0cf05ea3d0 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -24,14 +24,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1602,6 +1595,57 @@ public class TestCompactor {
0L, 0L, 1);
}
+ @Test
+ public void testCompactionForFileInSratchDir() throws Exception {
+ String dbName = "default";
+ String tblName = "cfs";
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " +
"STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ + "'transactional_properties'='default')";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver(createQuery, driver);
+
+
+
+ // Insert some data -> this will generate only insert deltas
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1,
'foo')", driver);
+
+ // Insert some data -> this will again generate only insert deltas
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2,
'bar')", driver);
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ Map<String, String> tblProperties = new HashMap<>();
+
tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation()
+ "/" + "_tmp");
+
+ //Create empty file in ScratchDir under table location
+ String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp";
+ Path dir = new Path(scratchDirPath + "/base_0000002_v0000005");
+ fs.mkdirs(dir);
+ Path emptyFile = AcidUtils.createBucketFile(dir, 0);
+ fs.create(emptyFile);
+
+ //Run MajorCompaction
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(conf);
+ t.init(new AtomicBoolean(true), new AtomicBoolean());
+ CompactionRequest Cr = new CompactionRequest(dbName, tblName,
CompactionType.MAJOR);
+ Cr.setProperties(tblProperties);
+ txnHandler.compact(Cr);
+ t.run();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompacts().size());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
+
+ }
+
@Test
public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
String dbName = "default";
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index d7e661bcd26..e3ceb3af055 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -1028,7 +1028,18 @@ public class CompactorMR {
AcidOutputFormat<WritableComparable, V> aof =
instantiate(AcidOutputFormat.class,
jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
- writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)),
options);
+ Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+ cleanupTmpLocationOnTaskRetry(options, rootDir);
+ writer = aof.getRawRecordWriter(rootDir, options);
+ }
+ }
+
+ private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options
options, Path rootDir) throws IOException {
+ Path tmpLocation = AcidUtils.createFilename(rootDir, options);
+ FileSystem fs = tmpLocation.getFileSystem(jobConf);
+
+ if (fs.exists(tmpLocation)) {
+ fs.delete(tmpLocation, true);
}
}