[ 
https://issues.apache.org/jira/browse/PARQUET-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112252#comment-17112252
 ] 

ASF GitHub Bot commented on PARQUET-1229:
-----------------------------------------

gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428008353



##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -499,55 +528,100 @@ public static final ParquetMetadata 
readFooter(Configuration configuration, File
    */
   @Deprecated
   public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter) throws IOException {
+    return readFooter(file, filter, null);
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter, 

Review comment:
       Why do we introduce new public methods that are deprecated already?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##########
@@ -1465,23 +1674,38 @@ public void writeDataPageV2Header(
             dataEncoding,
             rlByteLength, dlByteLength), to);
   }
-
+  

Review comment:
       Please, check your code to not to introduce any trailing whitespaces.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -442,7 +460,13 @@ static ParquetMetadata readSummaryMetadata(Configuration 
configuration, Path bas
    */
   @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
Path file) throws IOException {
-    return readFooter(configuration, file, NO_FILTER);
+    return readFooter(configuration, file, getDecryptionProperties(file, 
configuration));
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, 
Path file, 

Review comment:
       Why do we introduce new public methods that are deprecated already?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##########
@@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) {
     return offset;
   }
 
+  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor, 
+      int combinedFooterLength) throws IOException {
+    
+    byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+    from.read(nonce);
+    byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    from.read(gcmTag);
+    
+    AesGcmEncryptor footerSigner =  
fileDecryptor.createSignedFooterEncryptor();
+    
+    byte[] footerAndSignature = ((ByteBufferInputStream) 
from).slice(0).array();
+    int footerSignatureLength = AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH;
+    byte[] serializedFooter = new byte[combinedFooterLength - 
footerSignatureLength];
+    System.arraycopy(footerAndSignature, 0, serializedFooter, 0, 
serializedFooter.length);
+
+    byte[] signedFooterAAD = 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+    byte[] encryptedFooterBytes = footerSigner.encrypt(false, 
serializedFooter, nonce, signedFooterAAD);
+    byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - 
AesCipher.GCM_TAG_LENGTH, 
+        calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
+    if (!Arrays.equals(gcmTag, calculatedTag)) {
+      throw new TagVerificationException("Signature mismatch in plaintext 
footer");
+    }
+  }
+
   public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter) throws IOException {
+    return readParquetMetadata(from, filter, null, false, 0);
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter,
+      final InternalFileDecryptor fileDecryptor, final boolean 
encryptedFooter, 
+      final int combinedFooterLength) throws IOException {
+    
+    final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? 
fileDecryptor.fetchFooterDecryptor() : null);
+    final byte[] encryptedFooterAAD = (encryptedFooter? 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
+    
     FileMetaData fileMetaData = filter.accept(new 
MetadataFilterVisitor<FileMetaData, IOException>() {
       @Override
       public FileMetaData visit(NoFilter filter) throws IOException {
-        return readFileMetaData(from);
+        return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-        return readFileMetaData(from, true);
+        return readFileMetaData(from, true, footerDecryptor, 
encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(OffsetMetadataFilter filter) throws 
IOException {
-        return filterFileMetaDataByStart(readFileMetaData(from), filter);
+        return filterFileMetaDataByStart(readFileMetaData(from, 
footerDecryptor, encryptedFooterAAD), filter);
       }
 
       @Override
       public FileMetaData visit(RangeMetadataFilter filter) throws IOException 
{
-        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
+        return filterFileMetaDataByMidpoint(readFileMetaData(from, 
footerDecryptor, encryptedFooterAAD), filter);
       }
     });
     LOG.debug("{}", fileMetaData);
-    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+    
+    if (!encryptedFooter && null != fileDecryptor) {
+      if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file
+        fileDecryptor.setPlaintextFile();
+        // Done to detect files that were not encrypted by mistake
+        if (!fileDecryptor.plaintextFilesAllowed()) {
+          throw new IOException("Applying decryptor on plaintext file");
+        }
+      } else {  // Encrypted file with plaintext footer
+        // if no fileDecryptor, can still read plaintext columns
+        
fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), 
false, 
+            fileMetaData.getFooter_signing_key_metadata());
+        if (fileDecryptor.checkFooterIntegrity()) {
+          verifyFooterIntegrity(from, fileDecryptor, combinedFooterLength);
+        }
+      }
+    }
+    
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, 
fileDecryptor, encryptedFooter);
     if (LOG.isDebugEnabled()) 
LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
+  
+  public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, 
ColumnPath columnPath, PrimitiveType type, String createdBy) {
+    return ColumnChunkMetaData.get(
+        columnPath,
+        type,
+        fromFormatCodec(metaData.codec),
+        convertEncodingStats(metaData.getEncoding_stats()),
+        fromFormatEncodings(metaData.encodings),
+        fromParquetStatistics(
+            createdBy,
+            metaData.statistics,
+            type),
+        metaData.data_page_offset,
+        metaData.dictionary_page_offset,
+        metaData.num_values,
+        metaData.total_compressed_size,
+        metaData.total_uncompressed_size);
+  }
 
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) 
throws IOException {
+    return fromParquetMetadata(parquetMetadata, null, false);
+  }
+
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, 
+      InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws 
IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), 
parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
+    
     if (row_groups != null) {
       for (RowGroup rowGroup : row_groups) {
         BlockMetaData blockMetaData = new BlockMetaData();
         blockMetaData.setRowCount(rowGroup.getNum_rows());
         blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        // not set in legacy files
+        if (rowGroup.isSetOrdinal()) {
+          blockMetaData.setOrdinal(rowGroup.getOrdinal());
+        }
         List<ColumnChunk> columns = rowGroup.getColumns();
         String filePath = columns.get(0).getFile_path();
+        short columnOrdinal = -1;
         for (ColumnChunk columnChunk : columns) {
+          columnOrdinal++;
           if ((filePath == null && columnChunk.getFile_path() != null)
               || (filePath != null && 
!filePath.equals(columnChunk.getFile_path()))) {
             throw new ParquetDecodingException("all column chunks of the same 
row group must be in the same file for now");
           }
           ColumnMetaData metaData = columnChunk.meta_data;
-          ColumnPath path = getPath(metaData);
-          ColumnChunkMetaData column = ColumnChunkMetaData.get(
-              path,
-              messageType.getType(path.toArray()).asPrimitiveType(),
-              fromFormatCodec(metaData.codec),
-              convertEncodingStats(metaData.getEncoding_stats()),
-              fromFormatEncodings(metaData.encodings),
-              fromParquetStatistics(
-                  parquetMetadata.getCreated_by(),
-                  metaData.statistics,
-                  messageType.getType(path.toArray()).asPrimitiveType()),
-              metaData.data_page_offset,
-              metaData.dictionary_page_offset,
-              metaData.num_values,
-              metaData.total_compressed_size,
-              metaData.total_uncompressed_size);
+          ColumnCryptoMetaData cryptoMetaData = 
columnChunk.getCrypto_metadata();
+          ColumnChunkMetaData column = null;
+          ColumnPath columnPath = null;
+          boolean encryptedMetadata = false;
+          
+          if (null == cryptoMetaData) { // Plaintext column
+            if (null == metaData) {
+              throw new IOException("ColumnMetaData not set in plaintext 
column");

Review comment:
       There was no such check in the previous code. Strictly speaking it is a 
breaking change as a `NullPointerException` was thrown where an `IOException` 
is thrown today. 

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##########
@@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) {
     return offset;
   }
 
+  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor, 
+      int combinedFooterLength) throws IOException {
+    
+    byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+    from.read(nonce);
+    byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    from.read(gcmTag);
+    
+    AesGcmEncryptor footerSigner =  
fileDecryptor.createSignedFooterEncryptor();
+    
+    byte[] footerAndSignature = ((ByteBufferInputStream) 
from).slice(0).array();
+    int footerSignatureLength = AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH;
+    byte[] serializedFooter = new byte[combinedFooterLength - 
footerSignatureLength];
+    System.arraycopy(footerAndSignature, 0, serializedFooter, 0, 
serializedFooter.length);
+
+    byte[] signedFooterAAD = 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+    byte[] encryptedFooterBytes = footerSigner.encrypt(false, 
serializedFooter, nonce, signedFooterAAD);
+    byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
+    System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - 
AesCipher.GCM_TAG_LENGTH, 
+        calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
+    if (!Arrays.equals(gcmTag, calculatedTag)) {
+      throw new TagVerificationException("Signature mismatch in plaintext 
footer");
+    }
+  }
+
   public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter) throws IOException {
+    return readParquetMetadata(from, filter, null, false, 0);
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter,
+      final InternalFileDecryptor fileDecryptor, final boolean 
encryptedFooter, 
+      final int combinedFooterLength) throws IOException {
+    
+    final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? 
fileDecryptor.fetchFooterDecryptor() : null);
+    final byte[] encryptedFooterAAD = (encryptedFooter? 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
+    
     FileMetaData fileMetaData = filter.accept(new 
MetadataFilterVisitor<FileMetaData, IOException>() {
       @Override
       public FileMetaData visit(NoFilter filter) throws IOException {
-        return readFileMetaData(from);
+        return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-        return readFileMetaData(from, true);
+        return readFileMetaData(from, true, footerDecryptor, 
encryptedFooterAAD);
       }
 
       @Override
       public FileMetaData visit(OffsetMetadataFilter filter) throws 
IOException {
-        return filterFileMetaDataByStart(readFileMetaData(from), filter);
+        return filterFileMetaDataByStart(readFileMetaData(from, 
footerDecryptor, encryptedFooterAAD), filter);
       }
 
       @Override
       public FileMetaData visit(RangeMetadataFilter filter) throws IOException 
{
-        return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
+        return filterFileMetaDataByMidpoint(readFileMetaData(from, 
footerDecryptor, encryptedFooterAAD), filter);
       }
     });
     LOG.debug("{}", fileMetaData);
-    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+    
+    if (!encryptedFooter && null != fileDecryptor) {
+      if (!fileMetaData.isSetEncryption_algorithm()) { // Plaintext file
+        fileDecryptor.setPlaintextFile();
+        // Done to detect files that were not encrypted by mistake
+        if (!fileDecryptor.plaintextFilesAllowed()) {
+          throw new IOException("Applying decryptor on plaintext file");
+        }
+      } else {  // Encrypted file with plaintext footer
+        // if no fileDecryptor, can still read plaintext columns
+        
fileDecryptor.setFileCryptoMetaData(fileMetaData.getEncryption_algorithm(), 
false, 
+            fileMetaData.getFooter_signing_key_metadata());
+        if (fileDecryptor.checkFooterIntegrity()) {
+          verifyFooterIntegrity(from, fileDecryptor, combinedFooterLength);
+        }
+      }
+    }
+    
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, 
fileDecryptor, encryptedFooter);
     if (LOG.isDebugEnabled()) 
LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
+  
+  public ColumnChunkMetaData buildColumnChunkMetaData(ColumnMetaData metaData, 
ColumnPath columnPath, PrimitiveType type, String createdBy) {
+    return ColumnChunkMetaData.get(
+        columnPath,
+        type,
+        fromFormatCodec(metaData.codec),
+        convertEncodingStats(metaData.getEncoding_stats()),
+        fromFormatEncodings(metaData.encodings),
+        fromParquetStatistics(
+            createdBy,
+            metaData.statistics,
+            type),
+        metaData.data_page_offset,
+        metaData.dictionary_page_offset,
+        metaData.num_values,
+        metaData.total_compressed_size,
+        metaData.total_uncompressed_size);
+  }
 
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) 
throws IOException {
+    return fromParquetMetadata(parquetMetadata, null, false);
+  }
+
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, 
+      InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws 
IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), 
parquetMetadata.getColumn_orders());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
     List<RowGroup> row_groups = parquetMetadata.getRow_groups();
+    
     if (row_groups != null) {
       for (RowGroup rowGroup : row_groups) {
         BlockMetaData blockMetaData = new BlockMetaData();
         blockMetaData.setRowCount(rowGroup.getNum_rows());
         blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        // not set in legacy files
+        if (rowGroup.isSetOrdinal()) {
+          blockMetaData.setOrdinal(rowGroup.getOrdinal());
+        }
         List<ColumnChunk> columns = rowGroup.getColumns();
         String filePath = columns.get(0).getFile_path();
+        short columnOrdinal = -1;
         for (ColumnChunk columnChunk : columns) {
+          columnOrdinal++;
           if ((filePath == null && columnChunk.getFile_path() != null)
               || (filePath != null && 
!filePath.equals(columnChunk.getFile_path()))) {
             throw new ParquetDecodingException("all column chunks of the same 
row group must be in the same file for now");
           }
           ColumnMetaData metaData = columnChunk.meta_data;
-          ColumnPath path = getPath(metaData);
-          ColumnChunkMetaData column = ColumnChunkMetaData.get(
-              path,
-              messageType.getType(path.toArray()).asPrimitiveType(),
-              fromFormatCodec(metaData.codec),
-              convertEncodingStats(metaData.getEncoding_stats()),
-              fromFormatEncodings(metaData.encodings),
-              fromParquetStatistics(
-                  parquetMetadata.getCreated_by(),
-                  metaData.statistics,
-                  messageType.getType(path.toArray()).asPrimitiveType()),
-              metaData.data_page_offset,
-              metaData.dictionary_page_offset,
-              metaData.num_values,
-              metaData.total_compressed_size,
-              metaData.total_uncompressed_size);
+          ColumnCryptoMetaData cryptoMetaData = 
columnChunk.getCrypto_metadata();
+          ColumnChunkMetaData column = null;
+          ColumnPath columnPath = null;
+          boolean encryptedMetadata = false;
+          
+          if (null == cryptoMetaData) { // Plaintext column
+            if (null == metaData) {
+              throw new IOException("ColumnMetaData not set in plaintext 
column");
+            }
+            columnPath = getPath(metaData);
+            if (null != fileDecryptor && !fileDecryptor.plaintextFile()) {
+              // mark this column as plaintext in encrypted file decryptor
+              fileDecryptor.setColumnCryptoMetadata(columnPath, false, false, 
(byte[]) null, columnOrdinal);
+            }
+          } else {  // Encrypted column
+            boolean encryptedWithFooterKey = 
cryptoMetaData.isSetENCRYPTION_WITH_FOOTER_KEY();
+            if (encryptedWithFooterKey) { // Column encrypted with footer key
+              if (!encryptedFooter) {
+                throw new IOException("Column encrypted with footer key in 
file with plaintext footer");

Review comment:
       These `IOException`s seems to be thrown in cases of encryption related 
issues. Don't we want to use the specific exception instead?

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -705,22 +797,39 @@ public ParquetFileReader(Configuration conf, Path file, 
ParquetMetadata footer)
       paths.put(ColumnPath.get(col.getPath()), col);
     }
     this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+    this.fileDecryptor = fileMetaData.getFileDecryptor();
   }
 
   public ParquetFileReader(InputFile file, ParquetReadOptions options) throws 
IOException {
+    this(file, options, null);
+  }
+
+  public ParquetFileReader(InputFile file, ParquetReadOptions options, 
+      FileDecryptionProperties fileDecryptionProperties) throws IOException {
     this.converter = new ParquetMetadataConverter(options);
     this.file = file;
     this.f = file.newStream();
     this.options = options;
+    if ((null == fileDecryptionProperties) && (file instanceof 
HadoopInputFile)) {
+      HadoopInputFile hadoopFile = (HadoopInputFile) file;
+      fileDecryptionProperties = getDecryptionProperties(hadoopFile.getPath(), 
hadoopFile.getConfiguration());

Review comment:
       I think, it would be better to put the `FileDecryptionProperties` into 
`ParquetReadOptions`. `HadoopReadOptions` (the extension of 
`ParquetReadOptions`) already contains the hadoop conf so you may be able to 
create `FileDecryptionProperties` from there if not set.
   This way you do not need to add new methods/constructors where 
`ParquetReadOptions` is already there as an argument.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -1021,18 +1148,44 @@ DictionaryPage readDictionary(ColumnChunkMetaData meta) 
throws IOException {
         !meta.getEncodings().contains(Encoding.RLE_DICTIONARY)) {
       return null;
     }
+    /** TODO Gabor - can be replaced with this?:
+    if (!meta.hasDictionaryPage()) {
+      return null;
+    } */

Review comment:
       I think, we can replace it.

##########
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##########
@@ -499,55 +528,100 @@ public static final ParquetMetadata 
readFooter(Configuration configuration, File
    */
   @Deprecated
   public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter) throws IOException {
+    return readFooter(file, filter, null);
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter, 
+      FileDecryptionProperties fileDecryptionProperties) throws IOException {
     ParquetReadOptions options;
     if (file instanceof HadoopInputFile) {
-      options = HadoopReadOptions.builder(((HadoopInputFile) 
file).getConfiguration())
+      HadoopInputFile hadoopFile = (HadoopInputFile) file;
+      options = HadoopReadOptions.builder(hadoopFile.getConfiguration())
           .withMetadataFilter(filter).build();
+      if (null == fileDecryptionProperties) {
+        fileDecryptionProperties = 
getDecryptionProperties(hadoopFile.getPath(), hadoopFile.getConfiguration());
+      }
     } else {
       options = 
ParquetReadOptions.builder().withMetadataFilter(filter).build();
     }
 
     try (SeekableInputStream in = file.newStream()) {
-      return readFooter(file, options, in);
+      return readFooter(file, options, in, fileDecryptionProperties);
     }
   }
 
-  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, SeekableInputStream f) throws IOException {
+  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, SeekableInputStream f, 
+      FileDecryptionProperties fileDecryptionProperties) throws IOException {
     ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
-    return readFooter(file, options, f, converter);
+    return readFooter(file, options, f, converter, fileDecryptionProperties);
   }
 
-  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, SeekableInputStream f, ParquetMetadataConverter 
converter) throws IOException {
+  private static final ParquetMetadata readFooter(InputFile file, 
ParquetReadOptions options, 
+      SeekableInputStream f, ParquetMetadataConverter converter, 
+      FileDecryptionProperties fileDecryptionProperties) throws IOException {
+
     long fileLen = file.getLength();
+    String filePath = file.toString();
     LOG.debug("File length {}", fileLen);
+
     int FOOTER_LENGTH_SIZE = 4;
     if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC 
+ data + footer + footerIndex + MAGIC
-      throw new RuntimeException(file.toString() + " is not a Parquet file 
(too small length: " + fileLen + ")");
+      throw new RuntimeException(filePath + " is not a Parquet file (length is 
too low: " + fileLen + ")");
     }
-    long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
-    LOG.debug("reading footer index at {}", footerLengthIndex);
 
-    f.seek(footerLengthIndex);
-    int footerLength = readIntLittleEndian(f);
+    // Read footer length and magic string - with a single seek
     byte[] magic = new byte[MAGIC.length];
+    long fileMetadataLengthIndex = fileLen - magic.length - FOOTER_LENGTH_SIZE;
+    LOG.debug("reading footer index at {}", fileMetadataLengthIndex);
+    f.seek(fileMetadataLengthIndex);
+    int fileMetadataLength = readIntLittleEndian(f);
     f.readFully(magic);
-    if (!Arrays.equals(MAGIC, magic)) {
-      throw new RuntimeException(file.toString() + " is not a Parquet file. 
expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + 
Arrays.toString(magic));
+
+    boolean encryptedFooterMode;
+    if (Arrays.equals(MAGIC, magic)) {
+      encryptedFooterMode = false;
+    } else if (Arrays.equals(EFMAGIC, magic)) {
+      encryptedFooterMode = true;
+    } else {
+      throw new RuntimeException(filePath + " is not a Parquet file. Expected 
magic number at tail, but found " + Arrays.toString(magic));
+    }
+
+    long fileMetadataIndex = fileMetadataLengthIndex - fileMetadataLength;
+    LOG.debug("read footer length: {}, footer index: {}", fileMetadataLength, 
fileMetadataIndex);
+    if (fileMetadataIndex < magic.length || fileMetadataIndex >= 
fileMetadataLengthIndex) {
+      throw new RuntimeException("corrupted file: the footer index is not 
within the file: " + fileMetadataIndex);
     }
-    long footerIndex = footerLengthIndex - footerLength;
-    LOG.debug("read footer length: {}, footer index: {}", footerLength, 
footerIndex);
-    if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
-      throw new RuntimeException("corrupted file: the footer index is not 
within the file: " + footerIndex);
+    f.seek(fileMetadataIndex);
+
+    InternalFileDecryptor fileDecryptor = null;
+    if (null != fileDecryptionProperties) {
+      fileDecryptor  = new InternalFileDecryptor(fileDecryptionProperties);
     }
-    f.seek(footerIndex);
+
     // Read all the footer bytes in one time to avoid multiple read operations,
     // since it can be pretty time consuming for a single read operation in 
HDFS.
-    ByteBuffer footerBytesBuffer = ByteBuffer.allocate(footerLength);
+    ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
     f.readFully(footerBytesBuffer);
     LOG.debug("Finished to read all footer bytes.");
     footerBytesBuffer.flip();
     InputStream footerBytesStream = 
ByteBufferInputStream.wrap(footerBytesBuffer);
-    return converter.readParquetMetadata(footerBytesStream, 
options.getMetadataFilter());
+
+    // Regular file, or encrypted file with plaintext footer
+    if (!encryptedFooterMode) {
+      return converter.readParquetMetadata(footerBytesStream, 
options.getMetadataFilter(), fileDecryptor, false, 
+          fileMetadataLength);
+    }
+
+    // Encrypted file with encrypted footer
+    if (null == fileDecryptor) {
+      throw new RuntimeException("Trying to read file with encrypted footer. 
No keys available");

Review comment:
       I would use the specific crypto exception here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> parquet-mr code changes for encryption support
> ----------------------------------------------
>
>                 Key: PARQUET-1229
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1229
>             Project: Parquet
>          Issue Type: Sub-task
>          Components: parquet-mr
>            Reporter: Gidon Gershinsky
>            Assignee: Gidon Gershinsky
>            Priority: Major
>              Labels: pull-request-available
>
> Addition of encryption/decryption support to the existing Parquet classes and 
> APIs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to