This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 86b9e5d [CARBONDATA-3571] Add table status file read retry for query 86b9e5d is described below commit 86b9e5d5bbcef8fb2e36718db8a4dc14aa2bb9e9 Author: Jacky Li <jacky.li...@qq.com> AuthorDate: Tue Nov 5 16:29:54 2019 +0800 [CARBONDATA-3571] Add table status file read retry for query When storing table status file in object store, reading of table status file may fail (receive EOFException) when table status file is being modifying. To protect from this scenario, this PR adds retry when reading table status file This closes #3435 --- .../core/statusmanager/SegmentStatusManager.java | 64 +++++++++++++++------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 1f645f6..5fbcb28 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -22,6 +22,7 @@ import java.io.BufferedWriter; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; @@ -29,6 +30,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -53,10 +55,13 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DeleteLoadFolders; import org.apache.carbondata.core.util.path.CarbonTablePath; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.DEFAULT_CHARSET; + import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; + /** * Manages Load/Segment status */ @@ -69,6 +74,8 @@ public class SegmentStatusManager { private Configuration configuration; + private static final int READ_TABLE_STATUS_RETRY_COUNT = 3; + public SegmentStatusManager(AbsoluteTableIdentifier identifier) { this.identifier = identifier; configuration = FileFactory.getConfiguration(); @@ -254,33 +261,52 @@ public class SegmentStatusManager { DataInputStream dataInputStream = null; BufferedReader buffReader = null; InputStreamReader inStream = null; - LoadMetadataDetails[] listOfLoadFolderDetailsArray; + LoadMetadataDetails[] loadFolderDetails = null; AtomicFileOperations fileOperation = AtomicFileOperationFactory.getAtomicFileOperations(tableStatusPath); - try { - if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { - return new LoadMetadataDetails[0]; + if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { + return new LoadMetadataDetails[0]; + } + + // When storing table status file in object store, reading of table status file may + // fail (receive EOFException) when table status file is being modifying + // so here we retry multiple times before throwing EOFException + int retry = READ_TABLE_STATUS_RETRY_COUNT; + while (retry > 0) { + try { + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, Charset.forName(DEFAULT_CHARSET)); + buffReader = new BufferedReader(inStream); + loadFolderDetails = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class); + retry = 0; + } catch (EOFException ex) { + retry--; + if (retry == 0) { + // we have retried several times, throw this exception to make the execution failed + LOG.error("Failed to read table status file after retry", ex); + throw ex; + } + try { + // sleep for some time before retry + TimeUnit.MILLISECONDS.sleep(10); + } catch (InterruptedException e) { + // ignored + } + } catch (IOException e) { + LOG.error("Failed to read table status file", e); + throw e; + } finally { + closeStreams(buffReader, inStream, dataInputStream); } - dataInputStream = fileOperation.openForRead(); - inStream = new InputStreamReader(dataInputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); - buffReader = new BufferedReader(inStream); - listOfLoadFolderDetailsArray = - gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class); - } catch (IOException e) { - LOG.error("Failed to read metadata of load", e); - throw e; - } finally { - closeStreams(buffReader, inStream, dataInputStream); } // if listOfLoadFolderDetailsArray is null, return empty array - if (null == listOfLoadFolderDetailsArray) { + if (null == loadFolderDetails) { return new LoadMetadataDetails[0]; } - return listOfLoadFolderDetailsArray; + return loadFolderDetails; } /** @@ -518,7 +544,7 @@ public class SegmentStatusManager { try { dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + Charset.forName(DEFAULT_CHARSET))); String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetailsArray); brWriter.write(metadataInstance); @@ -889,7 +915,7 @@ public class SegmentStatusManager { dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + Charset.forName(DEFAULT_CHARSET))); String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); brWriter.write(metadataInstance);