This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fde841f09990717826c3a5c3f1c1434148ee21ca Author: Y Ethan Guo <[email protected]> AuthorDate: Wed May 15 08:25:30 2024 -0700 [HUDI-7750] Move HoodieLogFormatWriter class to hoodie-hadoop-common module (#11207) --- .../java/org/apache/hudi/common/fs/FSUtils.java | 26 ---------------------- .../hudi/common/table/log/HoodieLogFormat.java | 8 ++++++- .../common/table/log/HoodieLogFormatWriter.java | 21 ++++++++--------- .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java | 25 +++++++++++++++++++++ 4 files changed, 43 insertions(+), 37 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index ecbe3fc1766..30c968d080d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -43,8 +43,6 @@ import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.storage.StorageSchemes; import org.apache.hudi.storage.inline.InLineFSUtils; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +80,6 @@ public class FSUtils { Pattern.compile("^\\.(.+)_(.*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(.cdc)?)?"); public static final Pattern PREFIX_BY_FILE_ID_PATTERN = Pattern.compile("^(.+)-(\\d+)"); - private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final String LOG_FILE_EXTENSION = ".log"; private static final StoragePathFilter ALLOW_ALL_FILTER = file -> true; @@ -731,29 +728,6 @@ public class FSUtils { return pathInfoList; } - /** - * When a file was opened and the task died without closing the stream, another task executor cannot open because the - * existing lease will be active. We will try to recover the lease, from HDFS. If a data node went down, it takes - * about 10 minutes for the lease to be recovered. But if the client dies, this should be instant. - */ - public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) - throws IOException, InterruptedException { - LOG.info("Recover lease on dfs file {}", p); - // initiate the recovery - boolean recovered = false; - for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) { - LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p); - recovered = dfs.recoverLease(p); - if (recovered) { - break; - } - // Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover - // under default settings - Thread.sleep(1000); - } - return recovered; - } - /** * Serializable function interface. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 7d27d164559..ba95a5cdafc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; @@ -58,6 +59,8 @@ public interface HoodieLogFormat { String DEFAULT_WRITE_TOKEN = "0-0-0"; + String DEFAULT_LOG_FORMAT_WRITER = "org.apache.hudi.common.table.log.HoodieLogFormatWriter"; + /** * Writer interface to allow appending block to this file format. */ @@ -284,7 +287,10 @@ public interface HoodieLogFormat { if (sizeThreshold == null) { sizeThreshold = DEFAULT_SIZE_THRESHOLD; } - return new HoodieLogFormatWriter(storage, logFile, bufferSize, replication, sizeThreshold, + return (Writer) ReflectionUtils.loadClass( + DEFAULT_LOG_FORMAT_WRITER, + new Class[] {HoodieStorage.class, HoodieLogFile.class, Integer.class, Short.class, Long.class, String.class, HoodieLogFileWriteCallback.class}, + storage, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken, logFileWriteCallback); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 7e10d5064f9..ca7b30d7d03 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -7,23 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.hudi.common.table.log; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StorageSchemes; @@ -62,8 +63,8 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; - HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, - String rolloverLogWriteToken, HoodieLogFileWriteCallback logFileWriteCallback) { + public HoodieLogFormatWriter(HoodieStorage storage, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, + String rolloverLogWriteToken, HoodieLogFileWriteCallback logFileWriteCallback) { this.storage = storage; this.logFile = logFile; this.sizeThreshold = sizeThreshold; @@ -334,7 +335,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem. // ViewFileSystem unfortunately does not support this operation LOG.warn("Trying to recover log on path " + path); - if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { + if (HadoopFSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) { LOG.warn("Recovered lease on path " + path); // try again this.output = fs.append(path, bufferSize); diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java index ca504577b40..44be55438a1 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ import java.util.stream.Collectors; public class HadoopFSUtils { private static final Logger LOG = LoggerFactory.getLogger(HadoopFSUtils.class); private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; + private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; public static Configuration prepareHadoopConf(Configuration conf) { // look for all properties, prefixed to be picked up @@ -539,4 +541,27 @@ public class HadoopFSUtils { }, paths); } + + /** + * When a file was opened and the task died without closing the stream, another task executor cannot open because the + * existing lease will be active. We will try to recover the lease, from HDFS. If a data node went down, it takes + * about 10 minutes for the lease to be recovered. But if the client dies, this should be instant. + */ + public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p) + throws IOException, InterruptedException { + LOG.info("Recover lease on dfs file {}", p); + // initiate the recovery + boolean recovered = false; + for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) { + LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p); + recovered = dfs.recoverLease(p); + if (recovered) { + break; + } + // Sleep for 1 second before trying again. Typically it takes about 2-3 seconds to recover + // under default settings + Thread.sleep(1000); + } + return recovered; + } }
