[CARBONDATA-1277] Dictionary generation failure due to hdfs lease expiry This closes #1147
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/285ce72d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/285ce72d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/285ce72d Branch: refs/heads/streaming_ingest Commit: 285ce72d4c9b3364bbdc454f4b6b331b3caa42db Parents: 8b31f09 Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Sat Jul 8 15:46:25 2017 +0530 Committer: Venkata Ramana G <ramana.gollam...@huawei.com> Committed: Tue Jul 11 18:00:18 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 7 + .../core/datastore/impl/FileFactory.java | 21 ++ .../AtomicFileOperationsImpl.java | 5 +- .../apache/carbondata/core/util/CarbonUtil.java | 2 +- .../core/util/path/HDFSLeaseUtils.java | 215 +++++++++++++++++++ .../core/writer/CarbonDictionaryWriterImpl.java | 20 +- .../carbondata/core/writer/ThriftWriter.java | 2 +- 7 files changed, 266 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 208bab8..8110abb 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1287,6 +1287,13 @@ public final class CarbonCommonConstants { public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE"; + @CarbonProperty + public static final String CARBON_LEASE_RECOVERY_RETRY_COUNT = + "carbon.lease.recovery.retry.count"; + @CarbonProperty + public static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL = + "carbon.lease.recovery.retry.interval"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java index 7acd6b1..2a35ab3 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java @@ -518,4 +518,25 @@ public final class FileFactory { } } + /** + * This method will create the path object for a given file + * + * @param filePath + * @return + */ + public static Path getPath(String filePath) { + return new Path(filePath); + } + + /** + * This method will return the filesystem instance + * + * @param path + * @return + * @throws IOException + */ + public static FileSystem getFileSystem(Path path) throws IOException { + return path.getFileSystem(configuration); + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java index befc76e..61690ff 100644 --- a/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/fileoperations/AtomicFileOperationsImpl.java @@ -25,6 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.FileFactory.FileType; +import org.apache.carbondata.core.util.CarbonUtil; public class AtomicFileOperationsImpl implements AtomicFileOperations { @@ -67,10 +68,8 @@ public class AtomicFileOperationsImpl implements AtomicFileOperations { @Override public void close() throws IOException { if (null != dataOutStream) { - dataOutStream.close(); - + CarbonUtil.closeStream(dataOutStream); CarbonFile tempFile = FileFactory.getCarbonFile(tempWriteFilePath, fileType); - if (!tempFile.renameForce(filePath)) { throw new IOException("temporary file renaming failed, src=" + tempFile.getPath() + ", dest=" + filePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 8298600..06b2a61 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -128,7 +128,7 @@ public final class CarbonUtil { try { closeStream(stream); } catch (IOException e) { - LOGGER.error("Error while closing stream:" + e); + LOGGER.error(e, "Error while closing stream:" + e); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java new file mode 100644 index 0000000..c72c322 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/util/path/HDFSLeaseUtils.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.util.path; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonProperties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; + +/** + * Implementation for HDFS utility methods + */ +public class HDFSLeaseUtils { + + private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN = 1; + private static final int CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX = 50; + private static final String CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT = "5"; + private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN = 1000; + private static final int CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX = 10000; + private static final String CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT = "1000"; + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(HDFSLeaseUtils.class.getName()); + + /** + * This method will validate whether the exception thrown if for lease recovery from HDFS + * + * @param message + * @return + */ + public static boolean checkExceptionMessageForLeaseRecovery(String message) { + // depending on the scenario few more cases can be added for validating lease recovery exception + if (null != message && message.contains("Failed to APPEND_FILE")) { + return true; + } + return false; + } + + /** + * This method will make attempts to recover lease on a file using the + * distributed file system utility. + * + * @param filePath + * @return + * @throws IOException + */ + public static boolean recoverFileLease(String filePath) throws IOException { + LOGGER.info("Trying to recover lease on file: " + filePath); + FileFactory.FileType fileType = FileFactory.getFileType(filePath); + switch (fileType) { + case ALLUXIO: + case HDFS: + Path path = FileFactory.getPath(filePath); + FileSystem fs = FileFactory.getFileSystem(path); + return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) fs); + case VIEWFS: + path = FileFactory.getPath(filePath); + fs = FileFactory.getFileSystem(path); + ViewFileSystem viewFileSystem = (ViewFileSystem) fs; + Path targetFileSystemPath = viewFileSystem.resolvePath(path); + FileSystem targetFileSystem = FileFactory.getFileSystem(targetFileSystemPath); + if (targetFileSystem instanceof DistributedFileSystem) { + return recoverLeaseOnFile(filePath, path, (DistributedFileSystem) targetFileSystem); + } else { + LOGGER.error( + "Invalid file type. Lease recovery is not supported on filesystem with file: " + + filePath); + return false; + } + default: + LOGGER.error("Invalid file type. Lease recovery is not supported on filesystem with file: " + + filePath); + return false; + } + } + + /** + * Recovers lease on a file + * + * @param filePath + * @param path + * @param fs + * @return + * @throws IOException + */ + private static boolean recoverLeaseOnFile(String filePath, Path path, DistributedFileSystem fs) + throws IOException { + DistributedFileSystem dfs = fs; + int maxAttempts = getLeaseRecoveryRetryCount(); + int retryInterval = getLeaseRecoveryRetryInterval(); + boolean leaseRecovered = false; + IOException ioException = null; + for (int retryCount = 1; retryCount <= maxAttempts; retryCount++) { + try { + leaseRecovered = dfs.recoverLease(path); + if (!leaseRecovered) { + try { + LOGGER.info( + "Failed to recover lease after attempt " + retryCount + " . Will try again after " + + retryInterval + " ms..."); + Thread.sleep(retryInterval); + } catch (InterruptedException e) { + LOGGER.error(e, + "Interrupted exception occurred while recovering lease for file : " + filePath); + } + } + } catch (IOException e) { + if (e instanceof LeaseExpiredException && e.getMessage().contains("File does not exist")) { + LOGGER.error("The given file does not exist at path " + filePath); + throw e; + } else if (e instanceof FileNotFoundException) { + LOGGER.error("The given file does not exist at path " + filePath); + throw e; + } else { + LOGGER.error("Recover lease threw exception : " + e.getMessage()); + ioException = e; + } + } + LOGGER.info("Retrying again after interval of " + retryInterval + " ms..."); + } + if (leaseRecovered) { + LOGGER.info("Successfully able to recover lease on file: " + filePath); + return true; + } else { + LOGGER.error( + "Failed to recover lease on file: " + filePath + " after retrying for " + maxAttempts + + " at an interval of " + retryInterval); + if (null != ioException) { + throw ioException; + } else { + return false; + } + } + } + + private static int getLeaseRecoveryRetryCount() { + String retryMaxAttempts = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT, + CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT); + int retryCount = 0; + try { + retryCount = Integer.parseInt(retryMaxAttempts); + if (retryCount < CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + || retryCount > CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX) { + retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT); + LOGGER.warn( + "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT + + " is not in allowed range. Allowed range is >=" + + CARBON_LEASE_RECOVERY_RETRY_COUNT_MIN + " and <=" + + CARBON_LEASE_RECOVERY_RETRY_COUNT_MAX + ". Therefore considering default value: " + + retryCount); + } + } catch (NumberFormatException ne) { + retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_COUNT_DEFAULT); + LOGGER.warn("value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_COUNT + + " is incorrect. Therefore considering default value: " + retryCount); + } + return retryCount; + } + + private static int getLeaseRecoveryRetryInterval() { + String retryMaxAttempts = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL, + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT); + int retryCount = 0; + try { + retryCount = Integer.parseInt(retryMaxAttempts); + if (retryCount < CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + || retryCount > CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX) { + retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT); + LOGGER.warn( + "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL + + " is not in allowed range. Allowed range is >=" + + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MIN + " and <=" + + CARBON_LEASE_RECOVERY_RETRY_INTERVAL_MAX + + ". Therefore considering default value (ms): " + retryCount); + } + } catch (NumberFormatException ne) { + retryCount = Integer.parseInt(CARBON_LEASE_RECOVERY_RETRY_INTERVAL_DEFAULT); + LOGGER.warn( + "value configured for " + CarbonCommonConstants.CARBON_LEASE_RECOVERY_RETRY_INTERVAL + + " is incorrect. Therefore considering default value (ms): " + retryCount); + } + return retryCount; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java index 9de41e1..64ff202 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImpl.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.service.CarbonCommonFactory; import org.apache.carbondata.core.service.PathService; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.util.path.HDFSLeaseUtils; import org.apache.carbondata.format.ColumnDictionaryChunk; import org.apache.carbondata.format.ColumnDictionaryChunkMeta; @@ -359,7 +360,24 @@ public class CarbonDictionaryWriterImpl implements CarbonDictionaryWriter { // create thrift writer instance dictionaryThriftWriter = new ThriftWriter(dictionaryFile, true); // open the file stream - dictionaryThriftWriter.open(); + try { + dictionaryThriftWriter.open(); + } catch (IOException e) { + // Cases to handle + // 1. Handle File lease recovery + if (HDFSLeaseUtils.checkExceptionMessageForLeaseRecovery(e.getMessage())) { + LOGGER.error(e, "Lease recovery exception encountered for file: " + dictionaryFile); + boolean leaseRecovered = HDFSLeaseUtils.recoverFileLease(dictionaryFile); + if (leaseRecovered) { + // try to open output stream again after recovering the lease on file + dictionaryThriftWriter.open(); + } else { + throw e; + } + } else { + throw e; + } + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/285ce72d/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java index 9bf549d..d7b1a0f 100644 --- a/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/writer/ThriftWriter.java @@ -136,7 +136,7 @@ public class ThriftWriter { */ public void close() throws IOException { closeAtomicFileWriter(); - CarbonUtil.closeStreams(dataOutputStream); + CarbonUtil.closeStream(dataOutputStream); } /**