This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0902d45  [CARBONDATA-3460] Fixed EOFException in CarbonScanRDD
0902d45 is described below

commit 0902d459a30e0fdd72868b2956eeb1c6b3b06346
Author: kunal642 <kunalkapoor...@gmail.com>
AuthorDate: Wed Jul 3 10:54:08 2019 +0530

    [CARBONDATA-3460] Fixed EOFException in CarbonScanRDD
    
    Problem: Delete delta information was not written properly in the 
OutputStream due the flag based writing.
    
    Solution: Always write the delete delta info, the size of the array will be 
the deciding factor whether to read further or not.
    
    This closes #3316
---
 .../core/indexstore/ExtendedBlocklet.java          |  1 -
 .../apache/carbondata/hadoop/CarbonInputSplit.java | 52 +++++++++++-----------
 2 files changed, 26 insertions(+), 27 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
index d97148d..a85423b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/ExtendedBlocklet.java
@@ -177,7 +177,6 @@ public class ExtendedBlocklet extends Blocklet {
       DataOutputStream dos = new DataOutputStream(ebos);
       inputSplit.setFilePath(null);
       inputSplit.setBucketId(null);
-      inputSplit.setWriteDeleteDelta(false);
       if (inputSplit.isBlockCache()) {
         inputSplit.updateFooteroffset();
         inputSplit.updateBlockLength();
diff --git 
a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java 
b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index da1bc2c..edbfcfe 100644
--- a/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/core/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.hadoop;
 
 import java.io.ByteArrayInputStream;
@@ -150,8 +151,6 @@ public class CarbonInputSplit extends FileSplit
    */
   private int rowCount;
 
-  private boolean writeDeleteDelta = true;
-
   public CarbonInputSplit() {
     segment = null;
     taskId = "0";
@@ -195,7 +194,13 @@ public class CarbonInputSplit extends FileSplit
     this.version = ColumnarFormatVersion.valueOf(in.readShort());
     // will be removed after count(*) optmization in case of index server
     this.rowCount = in.readInt();
-    this.writeDeleteDelta = in.readBoolean();
+    if (in.readBoolean()) {
+      int numberOfDeleteDeltaFiles = in.readInt();
+      deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+      for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+        deleteDeltaFiles[i] = in.readUTF();
+      }
+    }
     // after deseralizing required field get the start position of field which 
will be only used
     // in executor
     int leftoverPosition = underlineStream.getPosition();
@@ -359,7 +364,13 @@ public class CarbonInputSplit extends FileSplit
       this.length = in.readLong();
       this.version = ColumnarFormatVersion.valueOf(in.readShort());
       this.rowCount = in.readInt();
-      this.writeDeleteDelta = in.readBoolean();
+      if (in.readBoolean()) {
+        int numberOfDeleteDeltaFiles = in.readInt();
+        deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
+        for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
+          deleteDeltaFiles[i] = in.readUTF();
+        }
+      }
       this.bucketId = in.readUTF();
     }
     this.blockletId = in.readUTF();
@@ -379,13 +390,6 @@ public class CarbonInputSplit extends FileSplit
       validBlockletIds.add((int) in.readShort());
     }
     this.isLegacyStore = in.readBoolean();
-    if (writeDeleteDelta) {
-      int numberOfDeleteDeltaFiles = in.readInt();
-      deleteDeltaFiles = new String[numberOfDeleteDeltaFiles];
-      for (int i = 0; i < numberOfDeleteDeltaFiles; i++) {
-        deleteDeltaFiles[i] = in.readUTF();
-      }
-    }
   }
 
   @Override public void write(DataOutput out) throws IOException {
@@ -397,11 +401,10 @@ public class CarbonInputSplit extends FileSplit
       out.writeLong(length);
       out.writeShort(version.number());
       out.writeInt(rowCount);
-      out.writeBoolean(writeDeleteDelta);
+      writeDeleteDeltaFile(out);
       out.writeUTF(bucketId);
       out.writeUTF(blockletId);
       out.write(serializeData, offset, actualLen);
-      writeDeleteDeltaFile(out);
       return;
     }
     // please refer writeDetailInfo doc
@@ -419,7 +422,7 @@ public class CarbonInputSplit extends FileSplit
     } else {
       out.writeInt(0);
     }
-    out.writeBoolean(writeDeleteDelta);
+    writeDeleteDeltaFile(out);
     if (null != bucketId) {
       out.writeUTF(bucketId);
     }
@@ -442,18 +445,19 @@ public class CarbonInputSplit extends FileSplit
       out.writeShort(blockletId);
     }
     out.writeBoolean(isLegacyStore);
-    writeDeleteDeltaFile(out);
   }
 
   private void writeDeleteDeltaFile(DataOutput out) throws IOException {
-    if (!writeDeleteDelta) {
-      return;
-    }
-    out.writeInt(null != deleteDeltaFiles ? deleteDeltaFiles.length : 0);
-    if (null != deleteDeltaFiles) {
-      for (int i = 0; i < deleteDeltaFiles.length; i++) {
-        out.writeUTF(deleteDeltaFiles[i]);
+    if (deleteDeltaFiles != null) {
+      out.writeBoolean(true);
+      out.writeInt(deleteDeltaFiles.length);
+      if (null != deleteDeltaFiles) {
+        for (int i = 0; i < deleteDeltaFiles.length; i++) {
+          out.writeUTF(deleteDeltaFiles[i]);
+        }
       }
+    } else {
+      out.writeBoolean(false);
     }
   }
 
@@ -586,7 +590,6 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public void setDeleteDeltaFiles(String[] deleteDeltaFiles) {
-    this.writeDeleteDelta = true;
     this.deleteDeltaFiles = deleteDeltaFiles;
   }
 
@@ -879,7 +882,4 @@ public class CarbonInputSplit extends FileSplit
     this.bucketId = bucketId;
   }
 
-  public void setWriteDeleteDelta(boolean writeDeleteDelta) {
-    this.writeDeleteDelta = writeDeleteDelta;
-  }
 }

Reply via email to