This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e465c73  PARQUET-1851: fix parquet metadata converter NPE (#852)
e465c73 is described below

commit e465c73d87f0b6ad190d494eea48394950fae759
Author: Chen, Junjie <[email protected]>
AuthorDate: Wed Jan 13 20:00:50 2021 +0800

    PARQUET-1851: fix parquet metadata converter NPE (#852)
---
 .../format/converter/ParquetMetadataConverter.java | 72 +++++++++++-----------
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 58 +++++++++--------
 .../parquet/hadoop/TestParquetFileWriter.java      | 21 +++++++
 3 files changed, 88 insertions(+), 63 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b4a4f8b..0f57d5d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -487,7 +487,7 @@ public class ParquetMetadataConverter {
 
   private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> 
rowGroups, BlockMetaData block,
       InternalFileEncryptor fileEncryptor) {
-    
+
     //rowGroup.total_byte_size = ;
     List<ColumnChunkMetaData> columns = block.getColumns();
     List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
@@ -520,18 +520,18 @@ public class ParquetMetadataConverter {
         
metaData.setDictionary_page_offset(columnMetaData.getDictionaryPageOffset());
       }
       metaData.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
-      if (!columnMetaData.getStatistics().isEmpty()) {
+      if (columnMetaData.getStatistics() != null && 
!columnMetaData.getStatistics().isEmpty()) {
         
metaData.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), 
this.statisticsTruncateLength));
       }
       if (columnMetaData.getEncodingStats() != null) {
         
metaData.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
       }
-      
+
       if (!encryptMetaData) {
         columnChunk.setMeta_data(metaData);
       }  else {
-        // Serialize and encrypt ColumnMetadata separately 
-        byte[] columnMetaDataAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
+        // Serialize and encrypt ColumnMetadata separately
+        byte[] columnMetaDataAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(),
             ModuleType.ColumnMetaData, rowGroupOrdinal, 
columnSetup.getOrdinal(), -1);
         if (null == tempOutStream) {
           tempOutStream = new ByteArrayOutputStream();
@@ -556,7 +556,7 @@ public class ParquetMetadataConverter {
       if (writeCryptoMetadata) {
         columnChunk.setCrypto_metadata(columnSetup.getColumnCryptoMetaData());
       }
-      
+
 //      columnChunk.meta_data.index_page_offset = ;
 //      columnChunk.meta_data.key_value_metadata = ; // nothing yet
 
@@ -574,7 +574,7 @@ public class ParquetMetadataConverter {
       parquetColumns.add(columnChunk);
     }
     RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), 
block.getRowCount());
-    rowGroup.setFile_offset(block.getStartingPos()); 
+    rowGroup.setFile_offset(block.getStartingPos());
     rowGroup.setTotal_compressed_size(block.getCompressedSize());
     rowGroup.setOrdinal((short) rowGroupOrdinal);
     rowGroups.add(rowGroup);
@@ -1226,13 +1226,13 @@ public class ParquetMetadataConverter {
     for (RowGroup rowGroup : rowGroups) {
       long totalSize = 0;
       long startIndex;
-      
+
       if (rowGroup.isSetFile_offset()) {
         startIndex = rowGroup.getFile_offset();
       } else {
         startIndex = getOffset(rowGroup.getColumns().get(0));
       }
-      
+
       if (rowGroup.isSetTotal_compressed_size()) {
         totalSize = rowGroup.getTotal_compressed_size();
       } else {
@@ -1240,13 +1240,13 @@ public class ParquetMetadataConverter {
           totalSize += col.getMeta_data().getTotal_compressed_size();
         }
       }
-      
+
       long midPoint = startIndex + totalSize / 2;
       if (filter.contains(midPoint)) {
         newRowGroups.add(rowGroup);
       }
     }
-    
+
     metaData.setRow_groups(newRowGroups);
     return metaData;
   }
@@ -1262,7 +1262,7 @@ public class ParquetMetadataConverter {
       } else {
         startIndex = getOffset(rowGroup.getColumns().get(0));
       }
-      
+
       if (filter.contains(startIndex)) {
         newRowGroups.add(rowGroup);
       }
@@ -1277,7 +1277,7 @@ public class ParquetMetadataConverter {
     }
     return getOffset(rowGroup.getColumns().get(0));
   }
-  
+
   // Visible for testing
   static long getOffset(ColumnChunk columnChunk) {
     ColumnMetaData md = columnChunk.getMeta_data();
@@ -1288,16 +1288,16 @@ public class ParquetMetadataConverter {
     return offset;
   }
 
-  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor, 
+  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor,
       int combinedFooterLength) throws IOException {
-    
+
     byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
     from.read(nonce);
     byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
     from.read(gcmTag);
-    
+
     AesGcmEncryptor footerSigner =  
fileDecryptor.createSignedFooterEncryptor();
-    
+
     byte[] footerAndSignature = ((ByteBufferInputStream) 
from).slice(0).array();
     int footerSignatureLength = AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH;
     byte[] serializedFooter = new byte[combinedFooterLength - 
footerSignatureLength];
@@ -1306,7 +1306,7 @@ public class ParquetMetadataConverter {
     byte[] signedFooterAAD = 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
     byte[] encryptedFooterBytes = footerSigner.encrypt(false, 
serializedFooter, nonce, signedFooterAAD);
     byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
-    System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - 
AesCipher.GCM_TAG_LENGTH, 
+    System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - 
AesCipher.GCM_TAG_LENGTH,
         calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
     if (!Arrays.equals(gcmTag, calculatedTag)) {
       throw new TagVerificationException("Signature mismatch in plaintext 
footer");
@@ -1318,12 +1318,12 @@ public class ParquetMetadataConverter {
   }
 
   public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter,
-      final InternalFileDecryptor fileDecryptor, final boolean 
encryptedFooter, 
+      final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter,
       final int combinedFooterLength) throws IOException {
-    
+
     final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? 
fileDecryptor.fetchFooterDecryptor() : null);
     final byte[] encryptedFooterAAD = (encryptedFooter? 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
-    
+
     FileMetaData fileMetaData = filter.accept(new 
MetadataFilterVisitor<FileMetaData, IOException>() {
       @Override
       public FileMetaData visit(NoFilter filter) throws IOException {
@@ -1346,7 +1346,7 @@ public class ParquetMetadataConverter {
       }
     });
     LOG.debug("{}", fileMetaData);
-    
+
     if (!encryptedFooter && null != fileDecryptor) {
       if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file
         fileDecryptor.setPlaintextFile();
@@ -1356,19 +1356,19 @@ public class ParquetMetadataConverter {
         }
       } else {  // Encrypted file with plaintext footer
         // if no fileDecryptor, can still read plaintext columns
-        
fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), 
false, 
+        
fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), 
false,
             fileMetaData.getFooter_signing_key_metadata());
         if (fileDecryptor.checkFooterIntegrity()) {
           verifyFooterIntegrity(from, fileDecryptor, combinedFooterLength);
         }
       }
     }
-    
+
     ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, 
fileDecryptor, encryptedFooter);
     if (LOG.isDebugEnabled()) 
LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
-  
+
   public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, 
ColumnPath columnPath, PrimitiveType type, String createdBy) {
     return ColumnChunkMetaData.get(
         columnPath,
@@ -1391,12 +1391,12 @@ public class ParquetMetadataConverter {
     return fromParquetMetadata(parquetMetadata, null, false);
   }
 
-  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, 
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata,
       InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws 
IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), 
parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
-    
+
     if (row_groups != null) {
       for (RowGroup rowGroup : row_groups) {
         BlockMetaData blockMetaData = new BlockMetaData();
@@ -1420,7 +1420,7 @@ public class ParquetMetadataConverter {
           ColumnChunkMetaData column = null;
           ColumnPath columnPath = null;
           boolean encryptedMetadata = false;
-          
+
           if (null == cryptoMetaData) { // Plaintext column
             columnPath = getPath(metaData);
             if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
@@ -1446,10 +1446,10 @@ public class ParquetMetadataConverter {
               encryptedMetadata = true;
             }
           }
-          
+
           String createdBy = parquetMetadata.getCreated_by();
           if (!encryptedMetadata) { // unencrypted column, or encrypted with 
footer key
-            column = buildColumnChunkMetaData(metaData, columnPath, 
+            column = buildColumnChunkMetaData(metaData, columnPath,
                 messageType.getType(columnPath.toArray()).asPrimitiveType(), 
createdBy);
             column.setRowGroupOrdinal(rowGroup.getOrdinal());
             column.setBloomFilterOffset(metaData.bloom_filter_offset);
@@ -1460,14 +1460,14 @@ public class ParquetMetadataConverter {
             byte[] columnKeyMetadata = columnKeyStruct.getKey_metadata();
             columnPath = ColumnPath.get(pathList.toArray(new 
String[pathList.size()]));
             byte[] encryptedMetadataBuffer = 
columnChunk.getEncrypted_column_metadata();
-            column = ColumnChunkMetaData.getWithEncryptedMetadata(this, 
columnPath, 
-                messageType.getType(columnPath.toArray()).asPrimitiveType(), 
encryptedMetadataBuffer, 
+            column = ColumnChunkMetaData.getWithEncryptedMetadata(this, 
columnPath,
+                messageType.getType(columnPath.toArray()).asPrimitiveType(), 
encryptedMetadataBuffer,
                 columnKeyMetadata, fileDecryptor, rowGroup.getOrdinal(), 
columnOrdinal, createdBy);
           }
-          
+
           column.setColumnIndexReference(toColumnIndexReference(columnChunk));
           column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
-          
+
           // TODO
           // index_page_offset
           // key_value_metadata
@@ -1712,7 +1712,7 @@ public class ParquetMetadataConverter {
                                       valueCount,
                                       rlEncoding,
                                       dlEncoding,
-                                      valuesEncoding), 
+                                      valuesEncoding),
                     to, blockEncryptor, AAD);
   }
 
@@ -1746,7 +1746,7 @@ public class ParquetMetadataConverter {
                                       rlEncoding,
                                       dlEncoding,
                                       valuesEncoding,
-                                      crc), 
+                                      crc),
                     to, blockEncryptor, AAD);
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c06e7a8..3892f3d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -130,7 +130,7 @@ public class ParquetFileWriter {
 
   // The Bloom filters
   private final List<Map<String, BloomFilter>> bloomFilters = new 
ArrayList<>();
-  
+
   // The file encryptor
   private final InternalFileEncryptor fileEncryptor;
 
@@ -301,12 +301,12 @@ public class ParquetFileWriter {
    */
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
       long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
-      int statisticsTruncateLength, boolean pageWriteChecksumEnabled) 
+      int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
           throws IOException{
     this(file, schema, mode, rowGroupSize, maxPaddingSize, 
columnIndexTruncateLength,
       statisticsTruncateLength, pageWriteChecksumEnabled, null);
   }
-  
+
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
                            long rowGroupSize, int maxPaddingSize, int 
columnIndexTruncateLength,
                            int statisticsTruncateLength, boolean 
pageWriteChecksumEnabled,
@@ -336,7 +336,7 @@ public class ParquetFileWriter {
     this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
 
     this.metadataConverter = new 
ParquetMetadataConverter(statisticsTruncateLength);
-    
+
     if (null == encryptionProperties) {
       this.fileEncryptor = null;
     } else {
@@ -394,7 +394,7 @@ public class ParquetFileWriter {
     }
     out.write(magic);
   }
-  
+
   InternalFileEncryptor getEncryptor() {
     return fileEncryptor;
   }
@@ -456,8 +456,8 @@ public class ParquetFileWriter {
   public void writeDictionaryPage(DictionaryPage dictionaryPage) throws 
IOException {
     writeDictionaryPage(dictionaryPage, null, null);
   }
-  
-  public void writeDictionaryPage(DictionaryPage dictionaryPage, 
+
+  public void writeDictionaryPage(DictionaryPage dictionaryPage,
       BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws 
IOException {
     state = state.write();
     LOG.debug("{}: write dictionary page: {} values", out.getPos(), 
dictionaryPage.getDictionarySize());
@@ -474,7 +474,7 @@ public class ParquetFileWriter {
         dictionaryPage.getEncoding(),
         (int) crc.getValue(),
         out,
-        headerBlockEncryptor, 
+        headerBlockEncryptor,
         AAD);
     } else {
       metadataConverter.writeDictionaryPageHeader(
@@ -483,7 +483,7 @@ public class ParquetFileWriter {
         dictionaryPage.getDictionarySize(),
         dictionaryPage.getEncoding(),
         out,
-        headerBlockEncryptor, 
+        headerBlockEncryptor,
         AAD);
     }
     long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
@@ -757,7 +757,7 @@ public class ParquetFileWriter {
       uncompressedTotalPageSize, compressedTotalPageSize, totalStats, 
columnIndexBuilder, offsetIndexBuilder,
       bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
   }
-  
+
   void writeColumnChunk(ColumnDescriptor descriptor,
       long valueCount,
       CompressionCodecName compressionCodecName,
@@ -773,7 +773,7 @@ public class ParquetFileWriter {
       Set<Encoding> dlEncodings,
       List<Encoding> dataEncodings,
       BlockCipher.Encryptor headerBlockEncryptor,
-      int rowGroupOrdinal, 
+      int rowGroupOrdinal,
       int columnOrdinal,
       byte[] fileAAD) throws IOException {
     startColumn(descriptor, valueCount, compressionCodecName);
@@ -782,7 +782,7 @@ public class ParquetFileWriter {
     if (dictionaryPage != null) {
       byte[] dictonaryPageHeaderAAD = null;
       if (null != headerBlockEncryptor) {
-        dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, 
ModuleType.DictionaryPageHeader, 
+        dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, 
ModuleType.DictionaryPageHeader,
             rowGroupOrdinal, columnOrdinal, -1);
       }
       writeDictionaryPage(dictionaryPage, headerBlockEncryptor, 
dictonaryPageHeaderAAD);
@@ -860,6 +860,10 @@ public class ParquetFileWriter {
    * @throws IOException if there is an error while writing
    */
   public void endBlock() throws IOException {
+    if (currentRecordCount == 0) {
+      throw new ParquetEncodingException("End block with zero record");
+    }
+
     state = state.endBlock();
     LOG.debug("{}: end block", out.getPos());
     currentBlock.setRowCount(currentRecordCount);
@@ -1119,7 +1123,7 @@ public class ParquetFileWriter {
           InternalColumnEncryptionSetup columnEncryptionSetup = 
fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
           if (columnEncryptionSetup.isEncrypted()) {
             columnIndexEncryptor = 
columnEncryptionSetup.getMetaDataEncryptor();
-            columnIndexAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex, 
+            columnIndexAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex,
                 block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
           }
         }
@@ -1159,7 +1163,7 @@ public class ParquetFileWriter {
           InternalColumnEncryptionSetup columnEncryptionSetup = 
fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
           if (columnEncryptionSetup.isEncrypted()) {
             offsetIndexEncryptor = 
columnEncryptionSetup.getMetaDataEncryptor();
-            offsetIndexAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex, 
+            offsetIndexAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex,
                 block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
           }
         }
@@ -1190,7 +1194,7 @@ public class ParquetFileWriter {
 
         long offset = out.getPos();
         column.setBloomFilterOffset(offset);
-        
+
         BlockCipher.Encryptor bloomFilterEncryptor = null;
         byte[] bloomFilterHeaderAAD = null;
         byte[] bloomFilterBitsetAAD = null;
@@ -1199,16 +1203,16 @@ public class ParquetFileWriter {
           if (columnEncryptionSetup.isEncrypted()) {
             bloomFilterEncryptor = 
columnEncryptionSetup.getMetaDataEncryptor();
             int columnOrdinal = columnEncryptionSetup.getOrdinal();
-            bloomFilterHeaderAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterHeader, 
+            bloomFilterHeaderAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterHeader,
                 block.getOrdinal(), columnOrdinal, -1);
-            bloomFilterBitsetAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterBitset, 
+            bloomFilterBitsetAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterBitset,
                 block.getOrdinal(), columnOrdinal, -1);
           }
         }
-        
-        
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out, 
+
+        
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out,
             bloomFilterEncryptor, bloomFilterHeaderAAD);
-        
+
         ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
         bloomFilter.writeTo(tempOutStream);
         byte[] serializedBitset = tempOutStream.toByteArray();
@@ -1219,12 +1223,12 @@ public class ParquetFileWriter {
       }
     }
   }
-  
+
   private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out,
       InternalFileEncryptor fileEncryptor) throws IOException {
-    
+
     ParquetMetadataConverter metadataConverter = new 
ParquetMetadataConverter();
-    
+
     // Unencrypted file
     if (null == fileEncryptor) {
       long footerIndex = out.getPos();
@@ -1235,11 +1239,11 @@ public class ParquetFileWriter {
       out.write(MAGIC);
       return;
     }
-    
+
     org.apache.parquet.format.FileMetaData parquetMetadata =
         metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, 
fileEncryptor);
-    
-    // Encrypted file with plaintext footer 
+
+    // Encrypted file with plaintext footer
     if (!fileEncryptor.isFooterEncrypted()) {
       long footerIndex = out.getPos();
       
parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
@@ -1255,7 +1259,7 @@ public class ParquetFileWriter {
       byte[] encryptedFooter = 
fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
       byte[] signature = new byte[AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH];
       System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, 
signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
-      System.arraycopy(encryptedFooter, encryptedFooter.length - 
AesCipher.GCM_TAG_LENGTH, 
+      System.arraycopy(encryptedFooter, encryptedFooter.length - 
AesCipher.GCM_TAG_LENGTH,
           signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // 
copy GCM Tag
       out.write(serializedFooter);
       out.write(signature);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 2b78917..c41c8bf 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -30,6 +30,7 @@ import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
+import org.apache.parquet.io.ParquetEncodingException;
 import org.junit.Assume;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,6 +62,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.*;
+import java.util.concurrent.Callable;
 
 import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
 import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
@@ -222,6 +224,25 @@ public class TestParquetFileWriter {
   }
 
   @Test
+  public void testWriteEmptyBlock() throws Exception {
+    File testFile = temp.newFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(0);
+
+    TestUtils.assertThrows("End block with zero record", 
ParquetEncodingException.class,
+      (Callable<Void>) () -> {
+      w.endBlock();
+      return null;
+    });
+  }
+
+  @Test
   public void testBloomFilterWriteRead() throws Exception {
     MessageType schema = MessageTypeParser.parseMessageType("message test { 
required binary foo; }");
     File testFile = temp.newFile();

Reply via email to