This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae1704b0d [hive] Bug fix for stuck thread on hive prepareCommit
failure (#8142)
3ae1704b0d is described below
commit 3ae1704b0d217a5e9f73722545680efd800feeac
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon Jun 8 17:12:23 2026 +0530
[hive] Bug fix for stuck thread on hive prepareCommit failure (#8142)
- Today, any hive job writing to Paimon table failing before precommit,
causes job to be permanently stuck and does not exit cleanly.
- This is due to Paimon throwing another exception in the cleanup
process, (when there are no files to be cleaned up). In such scenarios,
Paimon throws unchecked runtime exception not caught at caller.
- Causing the parent thread to be permanently stuck waiting job failure
to arrive.
- Ensure that we avoid unprotected file access, and can handle
exceptions when precommit files are not generated.
- This also fixes an issue of CI being stuck and timing out in some
cases when the UT has a failure.
---
.../paimon/hive/mapred/PaimonOutputCommitter.java | 34 +++++--
.../hive/mapred/PaimonOutputCommitterTest.java | 105 +++++++++++++++++++++
2 files changed, 131 insertions(+), 8 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
index 94bc4a675a..0fd37beb93 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
@@ -144,7 +144,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
if (table != null) {
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
List<CommitMessage> commitMessagesList =
- getAllPreCommitMessage(table.location(), jobContext,
table.fileIO());
+ getAllPreCommitMessage(table.location(), jobContext,
table.fileIO(), false);
try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
batchTableCommit.commit(commitMessagesList);
} catch (Exception e) {
@@ -172,7 +172,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
LOG.info("AbortJob {} has started", jobContext.getJobID());
List<CommitMessage> commitMessagesList =
- getAllPreCommitMessage(table.location(), jobContext,
table.fileIO());
+ getAllPreCommitMessage(table.location(), jobContext,
table.fileIO(), true);
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
batchTableCommit.abort(commitMessagesList);
@@ -214,7 +214,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
* @return The list of the committed data files
*/
private static List<CommitMessage> getAllPreCommitMessage(
- Path location, JobContext jobContext, FileIO io) {
+ Path location, JobContext jobContext, FileIO io, boolean
ignoreMissing) {
JobConf conf = jobContext.getJobConf();
int totalCommitMessagesSize =
@@ -225,7 +225,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
for (int i = 0; i < totalCommitMessagesSize; i++) {
Path commitFileLocation =
generatePreCommitFileLocation(location,
jobContext.getJobID(), i);
- commitMessagesList.addAll(readPreCommitFile(commitFileLocation,
io));
+ commitMessagesList.addAll(readPreCommitFile(commitFileLocation,
io, ignoreMissing));
}
return commitMessagesList;
@@ -270,10 +270,28 @@ public class PaimonOutputCommitter extends
OutputCommitter {
}
}
- private static List<CommitMessage> readPreCommitFile(Path location, FileIO
io) {
- try (ObjectInputStream objectInputStream =
- new ObjectInputStream(io.newInputStream(location))) {
- return (List<CommitMessage>) objectInputStream.readObject();
+ private static List<CommitMessage> readPreCommitFile(
+ Path location, FileIO io, boolean ignoreMissing) {
+ try {
+ if (!io.exists(location)) {
+ if (ignoreMissing) {
+ LOG.warn(
+ "preCommit file {} was not generated. The task did
not "
+ + "reach prepareCommit, so there are no
commit "
+ + "messages to abort. Skipping commit
cleanup "
+ + "for this task slot.",
+ location);
+ return Collections.emptyList();
+ }
+ throw new RuntimeException(
+ String.format(
+ "preCommit file %s is missing during commit.
Refusing to commit a partial result.",
+ location));
+ }
+ try (ObjectInputStream objectInputStream =
+ new ObjectInputStream(io.newInputStream(location))) {
+ return (List<CommitMessage>) objectInputStream.readObject();
+ }
} catch (ClassNotFoundException | IOException e) {
throw new RuntimeException(
String.format("Can not read or parse CommitMessage file:
%s", location));
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java
new file mode 100644
index 0000000000..491a422af6
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/mapred/PaimonOutputCommitterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.mapred;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ObjectOutputStream;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PaimonOutputCommitter}. */
+public class PaimonOutputCommitterTest {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void readPreCommitFileReturnsEmptyWhenFileMissingAndIgnoreMissing()
throws Exception {
+ Path missing = new Path(folder.newFolder().getAbsolutePath(),
"task_0.preCommit");
+ List<CommitMessage> result = invokeReadPreCommitFile(missing, true);
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ public void readPreCommitFileThrowsWhenFileMissingAndStrict() throws
Exception {
+ Path missing = new Path(folder.newFolder().getAbsolutePath(),
"task_0.preCommit");
+ org.assertj.core.api.Assertions.assertThatThrownBy(
+ () -> invokeReadPreCommitFile(missing, false))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("missing during commit");
+ }
+
+ @Test
+ public void readPreCommitFileThrowsOnCorruptFile() throws Exception {
+ java.io.File f = folder.newFile("task_0.preCommit");
+ try (java.io.FileOutputStream fos = new java.io.FileOutputStream(f)) {
+ fos.write(new byte[] {0x00, 0x01, 0x02, 0x03});
+ }
+ Path corrupt = new Path(f.getAbsolutePath());
+ org.assertj.core.api.Assertions.assertThatThrownBy(
+ () -> invokeReadPreCommitFile(corrupt, true))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining("Can not read or parse CommitMessage
file");
+ }
+
+ @Test
+ public void readPreCommitFileReturnsListForValidFile() throws Exception {
+ java.io.File f = folder.newFile("task_0.preCommit");
+ try (ObjectOutputStream oos = new ObjectOutputStream(new
java.io.FileOutputStream(f))) {
+ oos.writeObject(Collections.<CommitMessage>emptyList());
+ }
+ Path written = new Path(f.getAbsolutePath());
+ List<CommitMessage> result = invokeReadPreCommitFile(written, false);
+ assertThat(result).isNotNull().isEmpty();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static List<CommitMessage> invokeReadPreCommitFile(Path location,
boolean ignoreMissing)
+ throws Exception {
+ Method m =
+ PaimonOutputCommitter.class.getDeclaredMethod(
+ "readPreCommitFile",
+ Path.class,
+ org.apache.paimon.fs.FileIO.class,
+ boolean.class);
+ m.setAccessible(true);
+ try {
+ return (List<CommitMessage>)
+ m.invoke(null, location, LocalFileIO.create(),
ignoreMissing);
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ }
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ }
+ throw new RuntimeException(cause);
+ }
+ }
+}