This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fde841f09990717826c3a5c3f1c1434148ee21ca
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed May 15 08:25:30 2024 -0700

    [HUDI-7750] Move HoodieLogFormatWriter class to hoodie-hadoop-common module 
(#11207)
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    | 26 ----------------------
 .../hudi/common/table/log/HoodieLogFormat.java     |  8 ++++++-
 .../common/table/log/HoodieLogFormatWriter.java    | 21 ++++++++---------
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   | 25 +++++++++++++++++++++
 4 files changed, 43 insertions(+), 37 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index ecbe3fc1766..30c968d080d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -43,8 +43,6 @@ import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.StorageSchemes;
 import org.apache.hudi.storage.inline.InLineFSUtils;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +80,6 @@ public class FSUtils {
       
Pattern.compile("^\\.(.+)_(.*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(.cdc)?)?");
   public static final Pattern PREFIX_BY_FILE_ID_PATTERN = 
Pattern.compile("^(.+)-(\\d+)");
 
-  private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
   private static final String LOG_FILE_EXTENSION = ".log";
 
   private static final StoragePathFilter ALLOW_ALL_FILTER = file -> true;
@@ -731,29 +728,6 @@ public class FSUtils {
     return pathInfoList;
   }
 
-  /**
-   * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
-   * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
-   * about 10 minutes for the lease to be recovered. But if the client dies, 
this should be instant.
-   */
-  public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
-      throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file {}", p);
-    // initiate the recovery
-    boolean recovered = false;
-    for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
-      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
-      recovered = dfs.recoverLease(p);
-      if (recovered) {
-        break;
-      }
-      // Sleep for 1 second before trying again. Typically it takes about 2-3 
seconds to recover
-      // under default settings
-      Thread.sleep(1000);
-    }
-    return recovered;
-  }
-
   /**
    * Serializable function interface.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 7d27d164559..ba95a5cdafc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -58,6 +59,8 @@ public interface HoodieLogFormat {
 
   String DEFAULT_WRITE_TOKEN = "0-0-0";
 
+  String DEFAULT_LOG_FORMAT_WRITER = 
"org.apache.hudi.common.table.log.HoodieLogFormatWriter";
+
   /**
    * Writer interface to allow appending block to this file format.
    */
@@ -284,7 +287,10 @@ public interface HoodieLogFormat {
       if (sizeThreshold == null) {
         sizeThreshold = DEFAULT_SIZE_THRESHOLD;
       }
-      return new HoodieLogFormatWriter(storage, logFile, bufferSize, 
replication, sizeThreshold,
+      return (Writer) ReflectionUtils.loadClass(
+          DEFAULT_LOG_FORMAT_WRITER,
+          new Class[] {HoodieStorage.class, HoodieLogFile.class, 
Integer.class, Short.class, Long.class, String.class, 
HoodieLogFileWriteCallback.class},
+          storage, logFile, bufferSize, replication, sizeThreshold,
           rolloverLogWriteToken, logFileWriteCallback);
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
similarity index 95%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 7e10d5064f9..ca7b30d7d03 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -7,23 +7,24 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.hudi.common.table.log;
 
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageSchemes;
 
@@ -62,8 +63,8 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not 
sufficiently replicated yet";
 
-  HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer 
bufferSize, Short replication, Long sizeThreshold,
-                        String rolloverLogWriteToken, 
HoodieLogFileWriteCallback logFileWriteCallback) {
+  public HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, 
Integer bufferSize, Short replication, Long sizeThreshold,
+                               String rolloverLogWriteToken, 
HoodieLogFileWriteCallback logFileWriteCallback) {
     this.storage = storage;
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
@@ -334,7 +335,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       // data node is going down. Note that we can only try to recover lease 
for a DistributedFileSystem.
       // ViewFileSystem unfortunately does not support this operation
       LOG.warn("Trying to recover log on path " + path);
-      if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
+      if (HadoopFSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) 
{
         LOG.warn("Recovered lease on path " + path);
         // try again
         this.output = fs.append(path, bufferSize);
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index ca504577b40..44be55438a1 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +68,7 @@ import java.util.stream.Collectors;
 public class HadoopFSUtils {
   private static final Logger LOG = 
LoggerFactory.getLogger(HadoopFSUtils.class);
   private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
+  private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
 
   public static Configuration prepareHadoopConf(Configuration conf) {
     // look for all properties, prefixed to be picked up
@@ -539,4 +541,27 @@ public class HadoopFSUtils {
         },
         paths);
   }
+
+  /**
+   * When a file was opened and the task died without closing the stream, 
another task executor cannot open because the
+   * existing lease will be active. We will try to recover the lease, from 
HDFS. If a data node went down, it takes
+   * about 10 minutes for the lease to be recovered. But if the client dies, 
this should be instant.
+   */
+  public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, 
final Path p)
+      throws IOException, InterruptedException {
+    LOG.info("Recover lease on dfs file {}", p);
+    // initiate the recovery
+    boolean recovered = false;
+    for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; 
nbAttempt++) {
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
+      recovered = dfs.recoverLease(p);
+      if (recovered) {
+        break;
+      }
+      // Sleep for 1 second before trying again. Typically it takes about 2-3 
seconds to recover
+      // under default settings
+      Thread.sleep(1000);
+    }
+    return recovered;
+  }
 }

Reply via email to