This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e559f7  [HOTFIX] Fix INSERT STAGE footer read error
1e559f7 is described below

commit 1e559f72a3dc5233d2c239f546790299f0be701a
Author: Jacky Li <[email protected]>
AuthorDate: Wed Jan 8 14:54:25 2020 +0800

    [HOTFIX] Fix INSERT STAGE footer read error
    
    Why is this PR needed?
    There are 2 issues:
    1. INSERT STAGE will skip bytes to read the last long in the carbondata 
file as
    the offset to the footer, but sometimes DFS is not skipping the right byte 
size
    2. When error occurs, segment status is not in SUCCESS state, but recovery
    handling is not doing correctly in INSERT STAGE command
    
    What changes were proposed in this PR?
    1. In CarbonFooterReaderV3.readFooterVersion3, if offset is not set by
    caller, read it from corresponding index file.
    2. In CarbonInsertFromStageCommand, when recovery is required, delete the
    segment entry if the entry is not in SUCESS state.
    
    This closes #3561
---
 .../core/reader/CarbonFooterReaderV3.java          | 25 +-------------
 .../apache/carbondata/core/util/CarbonUtil.java    | 39 +++++++++++++++++++---
 .../core/util/DataFileFooterConverterV3.java       |  3 +-
 .../carbondata/core/util/path/CarbonTablePath.java | 14 ++++++++
 .../TestCreateTableWithBlockletSize.scala          |  3 +-
 .../TestNonTransactionalCarbonTable.scala          |  3 +-
 .../management/CarbonInsertFromStageCommand.scala  |  6 ++--
 .../carbondata/sdk/file/CarbonSchemaReader.java    |  2 +-
 .../carbondata/sdk/file/CSVCarbonWriterTest.java   |  2 +-
 .../java/org/apache/carbondata/tool/DataFile.java  |  2 +-
 10 files changed, 58 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
index e0c6121..2fbcd6c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/reader/CarbonFooterReaderV3.java
@@ -17,14 +17,10 @@
 
 package org.apache.carbondata.core.reader;
 
-import java.io.DataInputStream;
 import java.io.IOException;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.format.FileFooter3;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.thrift.TBase;
 
 /**
@@ -36,15 +32,11 @@ public class CarbonFooterReaderV3 {
   //Fact file path
   private String filePath;
 
-  // size of the file
-  private long fileSize;
-
   //start offset of the file footer
   private long footerOffset;
 
-  public CarbonFooterReaderV3(String filePath, long fileSize, long offset) {
+  public CarbonFooterReaderV3(String filePath, long offset) {
     this.filePath = filePath;
-    this.fileSize = fileSize;
     this.footerOffset = offset;
   }
 
@@ -58,21 +50,6 @@ public class CarbonFooterReaderV3 {
     ThriftReader thriftReader = openThriftReader(filePath);
     thriftReader.open();
 
-    // If footer offset is 0, means caller does not set it,
-    // so we read it from the end of the file
-    if (footerOffset == 0) {
-      Configuration conf = FileFactory.getConfiguration();
-      try (DataInputStream reader = FileFactory.getDataInputStream(filePath, 
conf)) {
-        long skipBytes = fileSize - CarbonCommonConstants.LONG_SIZE_IN_BYTE;
-        long skipped = reader.skip(skipBytes);
-        if (skipped != skipBytes) {
-          throw new IOException(String.format(
-              "expect skip %d bytes, but actually skipped %d bytes", 
skipBytes, skipped));
-        }
-        footerOffset = reader.readLong();
-      }
-    }
-
     // Set the offset from where it should read
     thriftReader.setReadOffset(footerOffset);
     FileFooter3 footer = (FileFooter3) thriftReader.read();
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 79daeaa..1bd4d89 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -914,10 +914,15 @@ public final class CarbonUtil {
       boolean forceReadDataFileFooter) throws IOException {
     BlockletDetailInfo detailInfo = tableBlockInfo.getDetailInfo();
     if (detailInfo == null || forceReadDataFileFooter) {
-      AbstractDataFileFooterConverter fileFooterConverter =
-          DataFileFooterConverterFactory.getInstance()
-              .getDataFileFooterConverter(tableBlockInfo.getVersion());
-      return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+      if (tableBlockInfo.getBlockOffset() == 0) {
+        // caller does not set the footer offset, so read index file and 
convert to footer
+        return getDataFileFooterFromIndexFile(tableBlockInfo);
+      } else {
+        AbstractDataFileFooterConverter fileFooterConverter =
+            DataFileFooterConverterFactory.getInstance()
+                .getDataFileFooterConverter(tableBlockInfo.getVersion());
+        return fileFooterConverter.readDataFileFooter(tableBlockInfo);
+      }
     } else {
       DataFileFooter fileFooter = new DataFileFooter();
       
fileFooter.setSchemaUpdatedTimeStamp(detailInfo.getSchemaUpdatedTimeStamp());
@@ -934,6 +939,32 @@ public final class CarbonUtil {
   }
 
   /**
+   * Read index file and convert to footer.
+   * Currently, this is used only in INSERT STAGE command to load data files 
written by SDK
+   */
+  private static DataFileFooter getDataFileFooterFromIndexFile(TableBlockInfo 
tableBlockInfo)
+      throws IOException {
+    String filePath = tableBlockInfo.getFilePath();
+    String dataFilePath = tableBlockInfo.getFilePath();
+    String shardName = CarbonTablePath.getShardName(filePath);
+    String indexFilePath = String
+        .format("%s/%s%s", CarbonTablePath.getParentPath(filePath), shardName,
+            CarbonCommonConstants.UPDATE_INDEX_FILE_EXT);
+    ColumnarFormatVersion version = ColumnarFormatVersion.valueOf((short)3);
+    AbstractDataFileFooterConverter footerConverter =
+        
DataFileFooterConverterFactory.getInstance().getDataFileFooterConverter(version);
+    List<DataFileFooter> footers = footerConverter.getIndexInfo(indexFilePath, 
null, true);
+
+    // find the footer of the input data file (tableBlockInfo)
+    for (DataFileFooter footer : footers) {
+      if 
(footer.getBlockInfo().getTableBlockInfo().getFilePath().equals(dataFilePath)) {
+        return footer;
+      }
+    }
+    throw new RuntimeException("Footer not found in index file");
+  }
+
+  /**
    * Below method will be used to get the number of dimension column
    * in carbon column schema
    *
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
index b917559..65ba609 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -64,8 +64,7 @@ public class DataFileFooterConverterV3 extends 
AbstractDataFileFooterConverter {
     CarbonHeaderReader carbonHeaderReader = new 
CarbonHeaderReader(tableBlockInfo.getFilePath());
     FileHeader fileHeader = carbonHeaderReader.readHeader();
     CarbonFooterReaderV3 reader =
-        new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), 
tableBlockInfo.getBlockLength(),
-            tableBlockInfo.getBlockOffset());
+        new CarbonFooterReaderV3(tableBlockInfo.getFilePath(), 
tableBlockInfo.getBlockOffset());
     FileFooter3 footer = reader.readFooterVersion3();
     return convertDataFileFooter(fileHeader, footer);
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index ea7542e..c25b536 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -715,4 +715,18 @@ public class CarbonTablePath {
           CarbonCommonConstants.FILE_SEPARATOR + taskNo;
     }
   }
+
+  /**
+   * Return the parent path of the input file.
+   * For example, if input file path is /user/warehouse/t1/file.carbondata
+   * then return will be /user/warehouse/t1
+   */
+  public static String getParentPath(String dataFilePath) {
+    int endIndex = 
dataFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR);
+    if (endIndex > -1) {
+      return dataFilePath.substring(0, endIndex);
+    } else {
+      return dataFilePath;
+    }
+  }
 }
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
index 2054a7b..ee0784b 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableWithBlockletSize.scala
@@ -61,8 +61,7 @@ class TestCreateTableWithBlockletSize extends QueryTest with 
BeforeAndAfterAll {
         .getFileHolder(FileFactory.getFileType(dataFile.getPath))
       val buffer = fileReader
         .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), 
dataFile.getSize - 8, 8)
-      val footerReader = new CarbonFooterReaderV3(
-        dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
+      val footerReader = new CarbonFooterReaderV3(dataFile.getAbsolutePath, 
buffer.getLong)
       val footer = footerReader.readFooterVersion3
       assertResult(2)(footer.blocklet_index_list.size)
       assertResult(2)(footer.blocklet_info_list3.size)
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index 2834526..2f0acef 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -2570,8 +2570,7 @@ class TestNonTransactionalCarbonTable extends QueryTest 
with BeforeAndAfterAll {
         .getFileHolder(FileFactory.getFileType(dataFile.getPath))
       val buffer = fileReader
         .readByteBuffer(FileFactory.getUpdatedFilePath(dataFile.getPath), 
dataFile.getSize - 8, 8)
-      val footerReader = new CarbonFooterReaderV3(
-        dataFile.getAbsolutePath, dataFile.getSize, buffer.getLong)
+      val footerReader = new CarbonFooterReaderV3(dataFile.getAbsolutePath, 
buffer.getLong)
       val footer = footerReader.readFooterVersion3
       // without page_size configuration there will be only 1 page, now it 
will be more.
       assert(footer.getBlocklet_info_list3.get(0).number_number_of_pages != 1)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0d1121d..0f5c4ae 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -220,16 +220,14 @@ case class CarbonInsertFromStageCommand(
           }.map { future =>
             future.get()
           }
-        case SegmentStatus.INSERT_IN_PROGRESS =>
+        case other =>
           // delete entry in table status and load again
-          LOGGER.info(s"Segment $segmentId is in INSERT_IN_PROGRESS state, 
about to delete the " +
+          LOGGER.warn(s"Segment $segmentId is in $other state, about to delete 
the " +
                       s"segment entry and load again")
           val segmentToWrite = 
segments.filterNot(_.getLoadName.equals(segmentId))
           SegmentStatusManager.writeLoadDetailsIntoFile(
             CarbonTablePath.getTableStatusFilePath(table.getTablePath),
             segmentToWrite)
-        case other =>
-          throw new RuntimeException(s"Segment $segmentId is in unexpected 
state: $other")
       }
     } finally {
       if (lock != null) {
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index 2732d3a..c692f6e 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -320,7 +320,7 @@ public class CarbonSchemaReader {
         
fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath), 
fileSize - 8, 8);
     fileReader.finish();
     CarbonFooterReaderV3 footerReader =
-        new CarbonFooterReaderV3(dataFilePath, fileSize, buffer.getLong());
+        new CarbonFooterReaderV3(dataFilePath, buffer.getLong());
     FileFooter3 footer = footerReader.readFooterVersion3();
     if (null != footer.getExtra_info()) {
       return 
footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO)
diff --git 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index ef2ec5e..533480c 100644
--- 
a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ 
b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -428,7 +428,7 @@ public class CSVCarbonWriterTest {
           dataFile.getPath()), dataFile.getSize() - 8, 8);
       fileReader.finish();
       CarbonFooterReaderV3 footerReader =
-          new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
dataFile.getSize(), buffer.getLong());
+          new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
buffer.getLong());
       FileFooter3 footer = footerReader.readFooterVersion3();
       Assert.assertEquals(2, footer.blocklet_index_list.size());
       Assert.assertEquals(2, footer.blocklet_info_list3.size());
diff --git a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java 
b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
index a1fdad5..4ed3945 100644
--- a/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
+++ b/tools/cli/src/main/java/org/apache/carbondata/tool/DataFile.java
@@ -170,7 +170,7 @@ class DataFile {
     this.footerOffset = buffer.getLong();
     this.footerSizeInBytes = this.fileSizeInBytes - footerOffset;
     CarbonFooterReaderV3 footerReader =
-        new CarbonFooterReaderV3(dataFile.getAbsolutePath(), 
dataFile.getSize(), footerOffset);
+        new CarbonFooterReaderV3(dataFile.getAbsolutePath(), footerOffset);
     return footerReader.readFooterVersion3();
   }
 

Reply via email to