This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1e559f7 [HOTFIX] Fix INSERT STAGE footer read error
1e559f7 is described below
commit 1e559f72a3dc5233d2c239f546790299f0be701a
Author: Jacky Li <[email protected]>
AuthorDate: Wed Jan 8 14:54:25 2020 +0800
[HOTFIX] Fix INSERT STAGE footer read error
Why is this PR needed?
There are 2 issues:
1. INSERT STAGE will skip bytes to read the last long in the carbondata
file as
the offset to the footer, but sometimes DFS is not skipping the right byte
size
2. When error occurs, segment status is not in SUCCESS state, but recovery
handling is not doing correctly in INSERT STAGE command
What changes were proposed in this PR?
1. In CarbonFooterReaderV3.readFooterVersion3, if offset is not set by
caller, read it from corresponding index file.
2. In CarbonInsertFromStageCommand, when recovery is required, delete the
segment entry if the entry is not in SUCESS state.
This closes #3561
---
.../core/reader/CarbonFooterReaderV3.java | 25 +-------------
.../apache/carbondata/core/util/CarbonUtil.java | 39 +++++++++++++++++++---
.../core/util/DataFileFooterConverterV3.java | 3 +-
.../carbondata/core/util/path/CarbonTablePath.java | 14 ++++++++
.../TestCreateTableWithBlockletSize.scala | 3 +-
.../TestNonTransactionalCarbonTable.scala | 3 +-
.../management/CarbonInsertFromStageCommand.scala | 6 ++--
.../carbondata/sdk/file/CarbonSchemaReader.java | 2 +-
.../carbondata/sdk/file/CSVCarbonWriterTest.java | 2 +-
.../java/org/apache/carbondata/tool/DataFile.java | 2 +-
10 files changed, 58 insertions(+), 41 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
index e0c6121..2fbcd6c 100644
---
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
+++
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
@@ -17,14 +17,10 @@
package org.apache.carbondata.core.reader;
-import java.io.DataInputStream;
import java.io.IOException;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.format.FileFooter3;
-import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
/**
@@ -36,15 +32,11 @@ public class CarbonFooterReaderV3 {
//Fact file path
private String filePath;
- // size of the file
- private long fileSize;
-
//start offset of the file footer
private long footerOffset;
- public CarbonFooterReaderV3(String filePath, long fileSize, long offset) {
+ public CarbonFooterReaderV3(String filePath, long offset) {
this.filePath = filePath;
- this.fileSize = fileSize;
this.footerOffset = offset;
}
@@ -58,21 +50,6 @@ public class CarbonFooterReaderV3 {
ThriftReader thriftReader = openThriftReader(filePath);
thriftReader.open();
- // If footer offset is 0, means caller does not set it,
- // so we read it from the end of the file
- if (footerOffset == 0) {
- Configuration conf = FileFactory.getConfiguration();
- try (DataInputStream reader = FileFactory.getDataInputStream(filePath,
conf)) {
- long skipBytes = fileSize - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
- long skipped = reader.skip(skipBytes);
- if (skipped != skipBytes) {
- throw new IOException(String.format(
- "expect skip %d bytes, but actually skipped %d bytes",
skipBytes, skipped));
- }
- footerOffset = reader.readLong();
- }
- }
-
// Set the offset from where it should read
thriftReader.setReadOffset(footerOffset);
FileFooter3 footer = (FileFooter3) thriftReader.read();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 79daeaa..1bd4d89 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -914,10 +914,15 @@ public final class CarbonUtil {
boolean forceReadDataFileFooter) throws IOException {
BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
if (detailInfo == null || forceReadDataFileFooter) {
- AbstractDataFileFooterConverter fileFooterConverter =
- DataFileFooterConverterFactory.getInstance()
- .getDataFileFooterConverter(tableBlockInfo.getVersion());
- return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ if (tableBlockInfo.getBlockOffset() == 0) {
+ // caller does not set the footer offset, so read index file and
convert to footer
+ return getDataFileFooterFromIndexFile(tableBlockInfo);
+ } else {
+ AbstractDataFileFooterConverter fileFooterConverter =
+ DataFileFooterConverterFactory.getInstance()
+ .getDataFileFooterConverter(tableBlockInfo.getVersion());
+ return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+ }
} else {
DataFileFooter fileFooter = new DataFileFooter();
fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
@@ -934,6 +939,32 @@ public final class CarbonUtil {
}
/**
+ * Read index file and convert to footer.
+ * Currently, this is used only in INSERT STAGE command to load data files
written by SDK
+ */
+ private static DataFileFooter getDataFileFooterFromIndexFile(TableBlockInfo
tableBlockInfo)
+ throws IOException {
+ String filePath = tableBlockInfo.getFilePath();
+ String dataFilePath = tableBlockInfo.getFilePath();
+ String shardName = CarbonTablePath.getShardName(filePath);
+ String indexFilePath = String
+ .format("%s/%s%s", CarbonTablePath.getParentPath(filePath), shardName,
+ CarbonCommonConstants.UPDATE_INDEX_FILE_EXT);
+ ColumnarFormatVersion version = ColumnarFormatVersion.valueOf((short)3);
+ AbstractDataFileFooterConverter footerConverter =
+
DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+ List<DataFileFooter> footers = footerConverter.getIndexInfo(indexFilePath,
null, true);
+
+ // find the footer of the input data file (tableBlockInfo)
+ for (DataFileFooter footer : footers) {
+ if
(footer.getBlockInfo().getTableBlockInfo().getFilePath().equals(dataFilePath)) {
+ return footer;
+ }
+ }
+ throw new RuntimeException("Footer not found in index file");
+ }
+
+ /**
* Below method will be used to get the number of dimension column
* in carbon column schema
*
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index b917559..65ba609 100644
---
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -64,8 +64,7 @@ public class DataFileFooterConverterV3 extends
AbstractDataFileFooterConverter {
CarbonHeaderReader carbonHeaderReader = new
CarbonHeaderReader(tableBlockInfo.getFilePath());
FileHeader fileHeader = carbonHeaderReader.readHeader();
CarbonFooterReaderV3 reader =
- new CarbonFooterReaderV3(tableBlockInfo.getFilePath(),
tableBlockInfo.getBlockLength(),
- tableBlockInfo.getBlockOffset());
+ new CarbonFooterReaderV3(tableBlockInfo.getFilePath(),
tableBlockInfo.getBlockOffset());
FileFooter3 footer = reader.readFooterVersion3();
return convertDataFileFooter(fileHeader, footer);
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index ea7542e..c25b536 100644
---
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -715,4 +715,18 @@ public class CarbonTablePath {
CarbonCommonConstants.FILE_SEPARATOR + taskNo;
}
}
+
+ /**
+ * Return the parent path of the input file.
+ * For example, if input file path is /user/warehouse/t1/file.carbondata
+ * then return will be /user/warehouse/t1
+ */
+ public static String getParentPath(String dataFilePath) {
+ int endIndex =
dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
+ if (endIndex > -1) {
+ return dataFilePath.substring(0, endIndex);
+ } else {
+ return dataFilePath;
+ }
+ }
}
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
index 2054a7b..ee0784b 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
@@ -61,8 +61,7 @@ class TestCreateTableWithBlockletSize extends QueryTest with
BeforeAndAfterAll {
.getFileHolder(FileFactory.getFileType(dataFile.getPath))
val buffer = fileReader
.readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath),
dataFile.getSize - 8, 8)
- val footerReader = new CarbonFooterReaderV3(
- dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
+ val footerReader = new CarbonFooterReaderV3(dataFile.getAbsolutePath,
buffer.getLong)
val footer = footerReader.readFooterVersion3
assertResult(2)(footer.blocklet_index_list.size)
assertResult(2)(footer.blocklet_info_list3.size)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 2834526..2f0acef 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2570,8 +2570,7 @@ class TestNonTransactionalCarbonTable extends QueryTest
with BeforeAndAfterAll {
.getFileHolder(FileFactory.getFileType(dataFile.getPath))
val buffer = fileReader
.readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath),
dataFile.getSize - 8, 8)
- val footerReader = new CarbonFooterReaderV3(
- dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
+ val footerReader = new CarbonFooterReaderV3(dataFile.getAbsolutePath,
buffer.getLong)
val footer = footerReader.readFooterVersion3
// without page_size configuration there will be only 1 page, now it
will be more.
assert(footer.getBlocklet_info_list3.get(0).number_number_of_pages != 1)
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0d1121d..0f5c4ae 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -220,16 +220,14 @@ case class CarbonInsertFromStageCommand(
}.map { future =>
future.get()
}
- case SegmentStatus.INSERT_IN_PROGRESS =>
+ case other =>
// delete entry in table status and load again
- LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state,
about to delete the " +
+ LOGGER.warn(s"Segment $segmentId is in $other state, about to delete
the " +
s"segment entry and load again")
val segmentToWrite =
segments.filterNot(_.getLoadName.equals(segmentId))
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath),
segmentToWrite)
- case other =>
- throw new RuntimeException(s"Segment $segmentId is in unexpected
state: $other")
}
} finally {
if (lock != null) {
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index 2732d3a..c692f6e 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -320,7 +320,7 @@ public class CarbonSchemaReader {
fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath),
fileSize - 8, 8);
fileReader.finish();
CarbonFooterReaderV3 footerReader =
- new CarbonFooterReaderV3(dataFilePath, fileSize, buffer.getLong());
+ new CarbonFooterReaderV3(dataFilePath, buffer.getLong());
FileFooter3 footer = footerReader.readFooterVersion3();
if (null != footer.getExtra_info()) {
return
footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO)
diff --git
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ef2ec5e..533480c 100644
---
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -428,7 +428,7 @@ public class CSVCarbonWriterTest {
dataFile.getPath()), dataFile.getSize() - 8, 8);
fileReader.finish();
CarbonFooterReaderV3 footerReader =
- new CarbonFooterReaderV3(dataFile.getAbsolutePath(),
dataFile.getSize(), buffer.getLong());
+ new CarbonFooterReaderV3(dataFile.getAbsolutePath(),
buffer.getLong());
FileFooter3 footer = footerReader.readFooterVersion3();
Assert.assertEquals(2, footer.blocklet_index_list.size());
Assert.assertEquals(2, footer.blocklet_info_list3.size());
diff --git a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
index a1fdad5..4ed3945 100644
--- a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
+++ b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
@@ -170,7 +170,7 @@ class DataFile {
this.footerOffset = buffer.getLong();
this.footerSizeInBytes = this.fileSizeInBytes - footerOffset;
CarbonFooterReaderV3 footerReader =
- new CarbonFooterReaderV3(dataFile.getAbsolutePath(),
dataFile.getSize(), footerOffset);
+ new CarbonFooterReaderV3(dataFile.getAbsolutePath(), footerOffset);
return footerReader.readFooterVersion3();
}