This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0023ff5 [core] ConsumerManager should use atomic overwrite for HDFS 
(#2166)
4c0023ff5 is described below

commit 4c0023ff5a7bc4cc7f91945e54a2bf6732485c76
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 25 21:22:52 2023 +0800

    [core] ConsumerManager should use atomic overwrite for HDFS (#2166)
---
 .../src/main/java/org/apache/paimon/fs/FileIO.java | 41 ++++++++-
 .../src/main/java/org/apache/paimon/fs/Path.java   |  6 ++
 .../org/apache/paimon/fs/hadoop/HadoopFileIO.java  | 96 ++++++++++++++++++++++
 .../org/apache/paimon/utils/ReflectionUtils.java   | 17 ++++
 .../apache/paimon/fs/FileIOBehaviorTestBase.java   |  2 +-
 .../test/java/org/apache/paimon/fs/FileIOTest.java | 50 +++++++++++
 .../org/apache/paimon/fs/HdfsBehaviorTest.java     | 18 ++++
 .../java/org/apache/paimon/consumer/Consumer.java  | 32 ++------
 .../apache/paimon/consumer/ConsumerManager.java    |  9 +-
 paimon-spark/paimon-spark-common/pom.xml           |  7 ++
 .../org/apache/paimon/spark/SparkS3ITCase.java     | 20 ++++-
 11 files changed, 260 insertions(+), 38 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 819d70223..a5e31ccd5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -42,9 +42,9 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.ServiceLoader;
 import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.fs.FileIOUtils.checkAccess;
@@ -204,7 +204,7 @@ public interface FileIO extends Serializable {
             return false;
         }
 
-        Path tmp = new Path(path.getParent(), "." + path.getName() + 
UUID.randomUUID());
+        Path tmp = path.createTempPath();
         boolean success = false;
         try {
             try (PositionOutputStream out = newOutputStream(tmp, false)) {
@@ -223,6 +223,43 @@ public interface FileIO extends Serializable {
         return success;
     }
 
+    /**
+     * Overwrite file by content atomically, different {@link FileIO}s have 
different atomic
+     * implementations.
+     */
+    default void overwriteFileUtf8(Path path, String content) throws 
IOException {
+        try (PositionOutputStream out = newOutputStream(path, true)) {
+            OutputStreamWriter writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
+            writer.write(content);
+            writer.flush();
+        }
+    }
+
+    /** Read file from {@link #overwriteFileUtf8} file. */
+    default Optional<String> readOverwrittenFileUtf8(Path path) throws 
IOException {
+        int retryNumber = 0;
+        IOException exception = null;
+        while (retryNumber++ < 5) {
+            try {
+                if (!exists(path)) {
+                    return Optional.empty();
+                }
+
+                return Optional.of(readFileUtf8(path));
+            } catch (IOException e) {
+                if (e.getClass()
+                        .getName()
+                        
.endsWith("org.apache.hadoop.fs.s3a.RemoteFileChangedException")) {
+                    exception = e;
+                } else {
+                    throw e;
+                }
+            }
+        }
+
+        throw exception;
+    }
+
     // 
-------------------------------------------------------------------------
     //                         static creator
     // 
-------------------------------------------------------------------------
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
index 96b2b9e01..87a9ce7b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
@@ -28,6 +28,7 @@ import org.apache.paimon.utils.StringUtils;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
 /**
@@ -318,6 +319,11 @@ public class Path implements Comparable<Path>, 
Serializable {
         return path.substring(slash + 1);
     }
 
+    /** Create a temporary path (to be used as a copy) for this path. */
+    public Path createTempPath() {
+        return new Path(getParent(), String.format(".%s.%s.tmp", getName(), 
UUID.randomUUID()));
+    }
+
     /**
      * Returns the parent of a path or null if at root.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index dd15b5280..7074341a7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -26,12 +26,19 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.hadoop.SerializableConfiguration;
+import org.apache.paimon.utils.ReflectionUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
 
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
 
 /** Hadoop {@link FileIO}. */
 public class HadoopFileIO implements FileIO {
@@ -116,6 +123,14 @@ public class HadoopFileIO implements FileIO {
         return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
     }
 
+    @Override
+    public void overwriteFileUtf8(Path path, String content) throws 
IOException {
+        boolean success = tryAtomicOverwriteViaRename(path, content);
+        if (!success) {
+            FileIO.super.overwriteFileUtf8(path, content);
+        }
+    }
+
     private org.apache.hadoop.fs.Path path(Path path) {
         return new org.apache.hadoop.fs.Path(path.toUri());
     }
@@ -290,4 +305,85 @@ public class HadoopFileIO implements FileIO {
             return status.getModificationTime();
         }
     }
+
+    // ============================== extra methods 
===================================
+
+    private transient volatile AtomicReference<Method> renameMethodRef;
+
+    public boolean tryAtomicOverwriteViaRename(Path dst, String content) 
throws IOException {
+        org.apache.hadoop.fs.Path hadoopDst = path(dst);
+        FileSystem fs = getFileSystem(hadoopDst);
+
+        if (renameMethodRef == null) {
+            synchronized (this) {
+                if (renameMethodRef == null) {
+                    Method method;
+                    // Implementation in FileSystem is incorrect, not atomic, 
Object Storage like S3
+                    // and OSS not override it
+                    // DistributedFileSystem and ViewFileSystem override the 
rename method to public
+                    // and implement correct renaming
+                    try {
+                        method = ReflectionUtils.getMethod(fs.getClass(), 
"rename", 3);
+                    } catch (NoSuchMethodException e) {
+                        method = null;
+                    }
+                    renameMethodRef = new AtomicReference<>(method);
+                }
+            }
+        }
+
+        Method renameMethod = renameMethodRef.get();
+        if (renameMethod == null) {
+            return false;
+        }
+
+        boolean renameDone = false;
+
+        // write tempPath
+        Path tempPath = dst.createTempPath();
+        org.apache.hadoop.fs.Path hadoopTemp = path(tempPath);
+        try {
+            try (PositionOutputStream out = newOutputStream(tempPath, false)) {
+                OutputStreamWriter writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
+                writer.write(content);
+                writer.flush();
+            }
+
+            renameMethod.invoke(
+                    fs, hadoopTemp, hadoopDst, new Options.Rename[] 
{Options.Rename.OVERWRITE});
+            renameDone = true;
+            // TODO: this is a workaround of HADOOP-16255 - remove this when 
HADOOP-16255 is
+            // resolved
+            tryRemoveCrcFile(hadoopTemp);
+            return true;
+        } catch (InvocationTargetException | IllegalAccessException e) {
+            throw new IOException(e);
+        } finally {
+            if (!renameDone) {
+                deleteQuietly(tempPath);
+            }
+        }
+    }
+
+    /** @throws IOException if a fatal exception occurs. Will try to ignore 
most exceptions. */
+    @SuppressWarnings("CatchMayIgnoreException")
+    private void tryRemoveCrcFile(org.apache.hadoop.fs.Path path) throws 
IOException {
+        try {
+            final org.apache.hadoop.fs.Path checksumFile =
+                    new org.apache.hadoop.fs.Path(
+                            path.getParent(), String.format(".%s.crc", 
path.getName()));
+
+            if (fs.exists(checksumFile)) {
+                // checksum file exists, deleting it
+                fs.delete(checksumFile, true); // recursive=true
+            }
+        } catch (Throwable t) {
+            if (t instanceof VirtualMachineError
+                    || t instanceof ThreadDeath
+                    || t instanceof LinkageError) {
+                throw t;
+            }
+            // else, ignore - we are removing crc file as "best-effort"
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
index bf14043f3..d523b099d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ReflectionUtils.java
@@ -47,4 +47,21 @@ public class ReflectionUtils {
             throws InvocationTargetException, IllegalAccessException {
         return (T) method.invoke(null, args);
     }
+
+    public static Method getMethod(Class<?> clz, String methodName, int 
argSize)
+            throws NoSuchMethodException {
+        Method method = null;
+        Method[] methods = clz.getMethods();
+        for (Method m : methods) {
+            if (methodName.equals(m.getName()) && m.getParameterTypes().length 
== argSize) {
+                method = m;
+                break;
+            }
+        }
+
+        if (method == null) {
+            throw new NoSuchMethodException(methodName);
+        }
+        return method;
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
index 3b6ae7ac3..106dee38e 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOBehaviorTestBase.java
@@ -227,7 +227,7 @@ public abstract class FileIOBehaviorTestBase {
     //  Utilities
     // ------------------------------------------------------------------------
 
-    private static String randomName() {
+    protected static String randomName() {
         return StringUtils.getRandomString(RND, 16, 16, 'a', 'z');
     }
 
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
index 67064ff7b..c1b86a0b2 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/FileIOTest.java
@@ -26,6 +26,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -56,4 +58,52 @@ public class FileIOTest {
                         CatalogContext.create(options));
         
assertThat(fileIO).isInstanceOf(RequireOptionsFileIOLoader.MyFileIO.class);
     }
+
+    public static void testOverwriteFileUtf8(Path file, FileIO fileIO) throws 
InterruptedException {
+        AtomicReference<Exception> exception = new AtomicReference<>();
+        final int max = 10;
+
+        Thread writeThread =
+                new Thread(
+                        () -> {
+                            for (int i = 0; i <= max; i++) {
+                                try {
+                                    fileIO.overwriteFileUtf8(file, "" + i);
+                                    Thread.sleep(100);
+                                } catch (Exception e) {
+                                    exception.set(e);
+                                    return;
+                                }
+                            }
+                        });
+
+        Thread readThread =
+                new Thread(
+                        () -> {
+                            while (true) {
+                                try {
+                                    Optional<String> ret = 
fileIO.readOverwrittenFileUtf8(file);
+                                    if (!ret.isPresent()) {
+                                        continue;
+                                    }
+
+                                    int value = Integer.parseInt(ret.get());
+                                    if (value == max) {
+                                        return;
+                                    }
+                                } catch (Exception e) {
+                                    exception.set(e);
+                                    return;
+                                }
+                            }
+                        });
+
+        writeThread.start();
+        readThread.start();
+
+        writeThread.join();
+        readThread.join();
+
+        assertThat(exception.get()).isNull();
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
index 1939d98a9..df5eb7ba7 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
+import java.io.IOException;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 /** Behavior tests for HDFS. */
@@ -84,4 +87,19 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
     protected Path getBasePath() {
         return basePath;
     }
+
+    @Test
+    public void testAtomicWrite() throws IOException {
+        Path file = new Path(getBasePath(), randomName());
+        fs.tryAtomicOverwriteViaRename(file, "Hi");
+        assertThat(fs.readFileUtf8(file)).isEqualTo("Hi");
+
+        fs.tryAtomicOverwriteViaRename(file, "Hello");
+        assertThat(fs.readFileUtf8(file)).isEqualTo("Hello");
+    }
+
+    @Test
+    public void testAtomicWriteMultipleThreads() throws InterruptedException {
+        FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(), 
randomName()), fs);
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
index 1d0bda823..0aad5bd63 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -26,8 +26,9 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
 /** Consumer which contains next snapshot. */
 public class Consumer {
@@ -58,31 +59,10 @@ public class Consumer {
     }
 
     public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
-        // Consumer updating uses FileIO.newOutputStream(..., overwrite).
-        // But this API may have some intermediate state, the file maybe empty
-        // So retry here to avoid exception when the file is intermediate state
-        int retryNumber = 0;
-        Exception exception = null;
-        while (retryNumber++ < READ_CONSUMER_RETRY_NUM) {
-            try {
-                if (!fileIO.exists(path)) {
-                    return Optional.empty();
-                }
-
-                String json = fileIO.readFileUtf8(path);
-                return Optional.of(Consumer.fromJson(json));
-            } catch (Exception e) {
-                exception = e;
-            }
-
-            try {
-                TimeUnit.MILLISECONDS.sleep(READ_CONSUMER_RETRY_INTERVAL);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-            }
+        try {
+            return 
fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
         }
-
-        throw new RuntimeException("Fails to read snapshot from path " + path, 
exception);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index b788c4a28..6a928b81c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -20,14 +20,11 @@ package org.apache.paimon.consumer;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.utils.DateTimeUtils;
 
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
-import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.util.HashMap;
 import java.util.List;
@@ -59,10 +56,8 @@ public class ConsumerManager implements Serializable {
     }
 
     public void resetConsumer(String consumerId, Consumer consumer) {
-        try (PositionOutputStream out = 
fileIO.newOutputStream(consumerPath(consumerId), true)) {
-            OutputStreamWriter writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
-            writer.write(consumer.toJson());
-            writer.flush();
+        try {
+            fileIO.overwriteFileUtf8(consumerPath(consumerId), 
consumer.toJson());
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index a7491df29..140d3c77d 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -178,6 +178,13 @@ under the License.
             <version>3.1.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-hive-common</artifactId>
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
index 7a27d7d03..22c9f7175 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkS3ITCase.java
@@ -18,7 +18,11 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOTest;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.s3.MinioTestContainer;
 import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
 import org.apache.paimon.testutils.junit.parameterized.Parameters;
@@ -32,6 +36,7 @@ import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -46,12 +51,14 @@ public class SparkS3ITCase {
     @RegisterExtension
     public static final MinioTestContainer MINIO_CONTAINER = new 
MinioTestContainer();
 
+    private static Path warehousePath;
+
     private static SparkSession spark = null;
 
     @BeforeAll
     public static void startMetastoreAndSpark() {
         String path = MINIO_CONTAINER.getS3UriForDefaultBucket() + "/" + 
UUID.randomUUID();
-        Path warehousePath = new Path(path);
+        warehousePath = new Path(path);
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.paimon", 
SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.paimon.warehouse", 
warehousePath.toString());
@@ -83,7 +90,7 @@ public class SparkS3ITCase {
 
     @AfterEach
     public void afterEach() {
-        spark.sql("DROP TABLE T");
+        spark.sql("DROP TABLE IF EXISTS T");
     }
 
     @TestTemplate
@@ -97,4 +104,13 @@ public class SparkS3ITCase {
         List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
         assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
     }
+
+    @TestTemplate
+    public void testS3AtomicWriteMultipleThreads() throws 
InterruptedException, IOException {
+        Path file = new Path(warehousePath, UUID.randomUUID().toString());
+        Options options = new Options();
+        MINIO_CONTAINER.getS3ConfigOptions().forEach(options::setString);
+        FileIO fileIO = FileIO.get(file, CatalogContext.create(options));
+        FileIOTest.testOverwriteFileUtf8(file, fileIO);
+    }
 }

Reply via email to