This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0023ff5 [core] ConsumerManager should use atomic overwrite for HDFS
(#2166)
4c0023ff5 is described below
commit 4c0023ff5a7bc4cc7f91945e54a2bf6732485c76
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 21:22:52 2023 +0800
[core] ConsumerManager should use atomic overwrite for HDFS (#2166)
---
.../src/main/java/org/apache/paimon/fs/FileIO.java | 41 ++++++++-
.../src/main/java/org/apache/paimon/fs/Path.java | 6 ++
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 96 ++++++++++++++++++++++
.../org/apache/paimon/utils/ReflectionUtils.java | 17 ++++
.../apache/paimon/fs/FileIOBehaviorTestBase.java | 2 +-
.../test/java/org/apache/paimon/fs/FileIOTest.java | 50 +++++++++++
.../org/apache/paimon/fs/HdfsBehaviorTest.java | 18 ++++
.../java/org/apache/paimon/consumer/Consumer.java | 32 ++------
.../apache/paimon/consumer/ConsumerManager.java | 9 +-
paimon-spark/paimon-spark-common/pom.xml | 7 ++
.../org/apache/paimon/spark/SparkS3ITCase.java | 20 ++++-
11 files changed, 260 insertions(+), 38 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 819d70223..a5e31ccd5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -42,9 +42,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.fs.FileIOUtils.checkAccess;
@@ -204,7 +204,7 @@ public interface FileIO extends Serializable {
return false;
}
- Path tmp = new Path(path.getParent(), "." + path.getName() +
UUID.randomUUID());
+ Path tmp = path.createTempPath();
boolean success = false;
try {
try (PositionOutputStream out = newOutputStream(tmp, false)) {
@@ -223,6 +223,43 @@ public interface FileIO extends Serializable {
return success;
}
+ /**
+ * Overwrite file by content atomically, different {@link FileIO}s have
different atomic
+ * implementations.
+ */
+ default void overwriteFileUtf8(Path path, String content) throws
IOException {
+ try (PositionOutputStream out = newOutputStream(path, true)) {
+ OutputStreamWriter writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
+ writer.write(content);
+ writer.flush();
+ }
+ }
+
+ /** Read file from {@link #overwriteFileUtf8} file. */
+ default Optional<String> readOverwrittenFileUtf8(Path path) throws
IOException {
+ int retryNumber = 0;
+ IOException exception = null;
+ while (retryNumber++ < 5) {
+ try {
+ if (!exists(path)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(readFileUtf8(path));
+ } catch (IOException e) {
+ if (e.getClass()
+ .getName()
+
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
+ exception = e;
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ throw exception;
+ }
+
//
-------------------------------------------------------------------------
// static creator
//
-------------------------------------------------------------------------
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
index 96b2b9e01..87a9ce7b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
@@ -28,6 +28,7 @@ import org.apache.paimon.utils.StringUtils;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.UUID;
import java.util.regex.Pattern;
/**
@@ -318,6 +319,11 @@ public class Path implements Comparable<Path>,
Serializable {
return path.substring(slash + 1);
}
+ /** Create a temporary path (to be used as a copy) for this path. */
+ public Path createTempPath() {
+ return new Path(getParent(), String.format(".%s.%s.tmp", getName(),
UUID.randomUUID()));
+ }
+
/**
* Returns the parent of a path or null if at root.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index dd15b5280..7074341a7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -26,12 +26,19 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
+import org.apache.paimon.utils.ReflectionUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
/** Hadoop {@link FileIO}. */
public class HadoopFileIO implements FileIO {
@@ -116,6 +123,14 @@ public class HadoopFileIO implements FileIO {
return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
}
+ @Override
+ public void overwriteFileUtf8(Path path, String content) throws
IOException {
+ boolean success = tryAtomicOverwriteViaRename(path, content);
+ if (!success) {
+ FileIO.super.overwriteFileUtf8(path, content);
+ }
+ }
+
private org.apache.hadoop.fs.Path path(Path path) {
return new org.apache.hadoop.fs.Path(path.toUri());
}
@@ -290,4 +305,85 @@ public class HadoopFileIO implements FileIO {
return status.getModificationTime();
}
}
+
+ // ============================== extra methods
===================================
+
+ private transient volatile AtomicReference<Method> renameMethodRef;
+
+ public boolean tryAtomicOverwriteViaRename(Path dst, String content)
throws IOException {
+ org.apache.hadoop.fs.Path hadoopDst = path(dst);
+ FileSystem fs = getFileSystem(hadoopDst);
+
+ if (renameMethodRef == null) {
+ synchronized (this) {
+ if (renameMethodRef == null) {
+ Method method;
+ // Implementation in FileSystem is incorrect, not atomic,
Object Storage like S3
+ // and OSS not override it
+ // DistributedFileSystem and ViewFileSystem override the
rename method to public
+ // and implement correct renaming
+ try {
+ method = ReflectionUtils.getMethod(fs.getClass(),
"rename", 3);
+ } catch (NoSuchMethodException e) {
+ method = null;
+ }
+ renameMethodRef = new AtomicReference<>(method);
+ }
+ }
+ }
+
+ Method renameMethod = renameMethodRef.get();
+ if (renameMethod == null) {
+ return false;
+ }
+
+ boolean renameDone = false;
+
+ // write tempPath
+ Path tempPath = dst.createTempPath();
+ org.apache.hadoop.fs.Path hadoopTemp = path(tempPath);
+ try {
+ try (PositionOutputStream out = newOutputStream(tempPath, false)) {
+ OutputStreamWriter writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
+ writer.write(content);
+ writer.flush();
+ }
+
+ renameMethod.invoke(
+ fs, hadoopTemp, hadoopDst, new Options.Rename[]
{Options.Rename.OVERWRITE});
+ renameDone = true;
+ // TODO: this is a workaround of HADOOP-16255 - remove this when
HADOOP-16255 is
+ // resolved
+ tryRemoveCrcFile(hadoopTemp);
+ return true;
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new IOException(e);
+ } finally {
+ if (!renameDone) {
+ deleteQuietly(tempPath);
+ }
+ }
+ }
+
+ /** @throws IOException if a fatal exception occurs. Will try to ignore
most exceptions. */
+ @SuppressWarnings("CatchMayIgnoreException")
+ private void tryRemoveCrcFile(org.apache.hadoop.fs.Path path) throws
IOException {
+ try {
+ final org.apache.hadoop.fs.Path checksumFile =
+ new org.apache.hadoop.fs.Path(
+ path.getParent(), String.format(".%s.crc",
path.getName()));
+
+ if (fs.exists(checksumFile)) {
+ // checksum file exists, deleting it
+ fs.delete(checksumFile, true); // recursive=true
+ }
+ } catch (Throwable t) {
+ if (t instanceof VirtualMachineError
+ || t instanceof ThreadDeath
+ || t instanceof LinkageError) {
+ throw t;
+ }
+ // else, ignore - we are removing crc file as "best-effort"
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
index bf14043f3..d523b099d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
@@ -47,4 +47,21 @@ public class ReflectionUtils {
throws InvocationTargetException, IllegalAccessException {
return (T) method.invoke(null, args);
}
+
+ public static Method getMethod(Class<?> clz, String methodName, int
argSize)
+ throws NoSuchMethodException {
+ Method method = null;
+ Method[] methods = clz.getMethods();
+ for (Method m : methods) {
+ if (methodName.equals(m.getName()) && m.getParameterTypes().length
== argSize) {
+ method = m;
+ break;
+ }
+ }
+
+ if (method == null) {
+ throw new NoSuchMethodException(methodName);
+ }
+ return method;
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
index 3b6ae7ac3..106dee38e 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
@@ -227,7 +227,7 @@ public abstract class FileIOBehaviorTestBase {
// Utilities
// ------------------------------------------------------------------------
- private static String randomName() {
+ protected static String randomName() {
return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
}
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
index 67064ff7b..c1b86a0b2 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
@@ -56,4 +58,52 @@ public class FileIOTest {
CatalogContext.create(options));
assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class);
}
+
+ public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws
InterruptedException {
+ AtomicReference<Exception> exception = new AtomicReference<>();
+ final int max = 10;
+
+ Thread writeThread =
+ new Thread(
+ () -> {
+ for (int i = 0; i <= max; i++) {
+ try {
+ fileIO.overwriteFileUtf8(file, "" + i);
+ Thread.sleep(100);
+ } catch (Exception e) {
+ exception.set(e);
+ return;
+ }
+ }
+ });
+
+ Thread readThread =
+ new Thread(
+ () -> {
+ while (true) {
+ try {
+ Optional<String> ret =
fileIO.readOverwrittenFileUtf8(file);
+ if (!ret.isPresent()) {
+ continue;
+ }
+
+ int value = Integer.parseInt(ret.get());
+ if (value == max) {
+ return;
+ }
+ } catch (Exception e) {
+ exception.set(e);
+ return;
+ }
+ }
+ });
+
+ writeThread.start();
+ readThread.start();
+
+ writeThread.join();
+ readThread.join();
+
+ assertThat(exception.get()).isNull();
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
index 1939d98a9..df5eb7ba7 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.io.IOException;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
/** Behavior tests for HDFS. */
@@ -84,4 +87,19 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
protected Path getBasePath() {
return basePath;
}
+
+ @Test
+ public void testAtomicWrite() throws IOException {
+ Path file = new Path(getBasePath(), randomName());
+ fs.tryAtomicOverwriteViaRename(file, "Hi");
+ assertThat(fs.readFileUtf8(file)).isEqualTo("Hi");
+
+ fs.tryAtomicOverwriteViaRename(file, "Hello");
+ assertThat(fs.readFileUtf8(file)).isEqualTo("Hello");
+ }
+
+ @Test
+ public void testAtomicWriteMultipleThreads() throws InterruptedException {
+ FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(),
randomName()), fs);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
index 1d0bda823..0aad5bd63 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -26,8 +26,9 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCre
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
/** Consumer which contains next snapshot. */
public class Consumer {
@@ -58,31 +59,10 @@ public class Consumer {
}
public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
- // Consumer updating uses FileIO.newOutputStream(..., overwrite).
- // But this API may have some intermediate state, the file maybe empty
- // So retry here to avoid exception when the file is intermediate state
- int retryNumber = 0;
- Exception exception = null;
- while (retryNumber++ < READ_CONSUMER_RETRY_NUM) {
- try {
- if (!fileIO.exists(path)) {
- return Optional.empty();
- }
-
- String json = fileIO.readFileUtf8(path);
- return Optional.of(Consumer.fromJson(json));
- } catch (Exception e) {
- exception = e;
- }
-
- try {
- TimeUnit.MILLISECONDS.sleep(READ_CONSUMER_RETRY_INTERVAL);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
+ try {
+ return
fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
}
-
- throw new RuntimeException("Fails to read snapshot from path " + path,
exception);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index b788c4a28..6a928b81c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -20,14 +20,11 @@ package org.apache.paimon.consumer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.utils.DateTimeUtils;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
@@ -59,10 +56,8 @@ public class ConsumerManager implements Serializable {
}
public void resetConsumer(String consumerId, Consumer consumer) {
- try (PositionOutputStream out =
fileIO.newOutputStream(consumerPath(consumerId), true)) {
- OutputStreamWriter writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
- writer.write(consumer.toJson());
- writer.flush();
+ try {
+ fileIO.overwriteFileUtf8(consumerPath(consumerId),
consumer.toJson());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
diff --git a/paimon-spark/paimon-spark-common/pom.xml
b/paimon-spark/paimon-spark-common/pom.xml
index a7491df29..140d3c77d 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -178,6 +178,13 @@ under the License.
<version>3.1.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-hive-common</artifactId>
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
index 7a27d7d03..22c9f7175 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
@@ -18,7 +18,11 @@
package org.apache.paimon.spark;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOTest;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.s3.MinioTestContainer;
import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
import org.apache.paimon.testutils.junit.parameterized.Parameters;
@@ -32,6 +36,7 @@ import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -46,12 +51,14 @@ public class SparkS3ITCase {
@RegisterExtension
public static final MinioTestContainer MINIO_CONTAINER = new
MinioTestContainer();
+ private static Path warehousePath;
+
private static SparkSession spark = null;
@BeforeAll
public static void startMetastoreAndSpark() {
String path = MINIO_CONTAINER.getS3UriForDefaultBucket() + "/" +
UUID.randomUUID();
- Path warehousePath = new Path(path);
+ warehousePath = new Path(path);
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.paimon",
SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.paimon.warehouse",
warehousePath.toString());
@@ -83,7 +90,7 @@ public class SparkS3ITCase {
@AfterEach
public void afterEach() {
- spark.sql("DROP TABLE T");
+ spark.sql("DROP TABLE IF EXISTS T");
}
@TestTemplate
@@ -97,4 +104,13 @@ public class SparkS3ITCase {
List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
}
+
+ @TestTemplate
+ public void testS3AtomicWriteMultipleThreads() throws
InterruptedException, IOException {
+ Path file = new Path(warehousePath, UUID.randomUUID().toString());
+ Options options = new Options();
+ MINIO_CONTAINER.getS3ConfigOptions().forEach(options::setString);
+ FileIO fileIO = FileIO.get(file, CatalogContext.create(options));
+ FileIOTest.testOverwriteFileUtf8(file, fileIO);
+ }
}