This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e4988f3  Parquet-1872: Add TransCompression command to parquet-tools 
(#796)
e4988f3 is described below

commit e4988f3489663b99ff35a8573bab5522d5e6dcf8
Author: shangxinli <[email protected]>
AuthorDate: Tue Jun 23 02:40:04 2020 -0700

    Parquet-1872: Add TransCompression command to parquet-tools (#796)
---
 .../cli/commands/TransCompressionCommand.java      |  96 +++++++
 .../apache/parquet/hadoop/ParquetFileReader.java   |   2 +-
 .../parquet/hadoop/util/CompressionConverter.java  | 271 +++++++++++++++++
 .../parquet/hadoop/util/H1SeekableInputStream.java |   2 +-
 .../hadoop/util/CompressionConveterTest.java       | 319 +++++++++++++++++++++
 .../tools/command/TransCompressionCommand.java     |  92 ++++++
 6 files changed, 780 insertions(+), 2 deletions(-)

diff --git 
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
new file mode 100644
index 0000000..cae6810
--- /dev/null
+++ 
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.cli.commands;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.cli.BaseCommand;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+@Parameters(commandDescription="Translate the compression from one to another")
+public class TransCompressionCommand extends BaseCommand {
+
+  private CompressionConverter compressionConverter;
+
+  public TransCompressionCommand(Logger console) {
+    super(console);
+    compressionConverter = new CompressionConverter();
+  }
+
+  @Parameter(description = "<input parquet file path>")
+  String input;
+
+  @Parameter(description = "<output parquet file path>")
+  String output;
+
+  @Parameter(description = "<new compression codec")
+  String codec;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public int run() throws IOException {
+    Preconditions.checkArgument(input != null && output != null,
+      "Both input and output parquet file paths are required.");
+
+    Preconditions.checkArgument(codec != null,
+      "The codec cannot be null");
+
+    Path inPath = new Path(input);
+    Path outPath = new Path(output);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(getConf(), inPath, 
NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(getConf(), schema, 
outPath, ParquetFileWriter.Mode.CREATE);
+    writer.start();
+
+    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, getConf()), 
HadoopReadOptions.builder(getConf()).build())) {
+      compressionConverter.processBlocks(reader, writer, metaData, schema, 
metaData.getFileMetaData().getCreatedBy(), codecName);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+    return 0;
+  }
+
+  @Override
+  public List<String> getExamples() {
+    return Lists.newArrayList(
+        "# Translate the compression from one to another",
+        " input.parquet output.parquet ZSTD"
+    );
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 18fbf6d..987b603 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -613,8 +613,8 @@ public class ParquetFileReader implements Closeable {
     return new ParquetFileReader(file, options);
   }
 
+  protected final SeekableInputStream f;
   private final InputFile file;
-  private final SeekableInputStream f;
   private final ParquetReadOptions options;
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
   private final FileMetaData fileMetaData; // may be null
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
new file mode 100644
index 0000000..922699f
--- /dev/null
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class CompressionConverter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompressionConverter.class);
+
+  private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
+  private byte[] pageBuffer;
+
+  public CompressionConverter() {
+    this.pageBuffer = new byte[pageBufferSize];
+  }
+
+  public void processBlocks(TransParquetFileReader reader, ParquetFileWriter 
writer, ParquetMetadata meta, MessageType schema,
+                             String createdBy, CompressionCodecName codecName) 
throws IOException {
+    int blockIndex = 0;
+    PageReadStore store = reader.readNextRowGroup();
+    while (store != null) {
+      writer.startBlock(store.getRowCount());
+      BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
+      List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
+      Map<ColumnPath, ColumnDescriptor> descriptorsMap = 
schema.getColumns().stream().collect(
+        Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
+        ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new 
DummyGroupConverter(), schema, createdBy);
+        ColumnDescriptor columnDescriptor = 
descriptorsMap.get(chunk.getPath());
+        writer.startColumn(columnDescriptor, 
crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
+        processChunk(reader, writer, chunk, createdBy, codecName);
+        writer.endColumn();
+      }
+      writer.endBlock();
+      store = reader.readNextRowGroup();
+      blockIndex++;
+    }
+  }
+
+  private void processChunk(TransParquetFileReader reader, ParquetFileWriter 
writer, ColumnChunkMetaData chunk,
+                            String createdBy, CompressionCodecName codecName) 
throws IOException {
+    CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
+    CompressionCodecFactory.BytesInputDecompressor decompressor = 
codecFactory.getDecompressor(chunk.getCodec());
+    CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(codecName);
+    ColumnIndex columnIndex = reader.readColumnIndex(chunk);
+    OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
+
+    reader.setStreamPosition(chunk.getStartingPos());
+    DictionaryPage dictionaryPage = null;
+    long readValues = 0;
+    Statistics statistics = null;
+    ParquetMetadataConverter converter = new ParquetMetadataConverter();
+    int pageIndex = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      PageHeader pageHeader = reader.readPageHeader();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      byte[] pageLoad;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          if (dictionaryPage != null) {
+            throw new IOException("has more than one dictionary page in column 
chunk");
+          }
+          DictionaryPageHeader dictPageHeader = 
pageHeader.dictionary_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, 
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          writer.writeDictionaryPage(new 
DictionaryPage(BytesInput.from(pageLoad),
+                                                   
pageHeader.getUncompressed_page_size(),
+                                                   
dictPageHeader.getNum_values(),
+                                                   
converter.getEncoding(dictPageHeader.getEncoding())));
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          pageLoad = translatePageLoad(reader, true, compressor, decompressor, 
pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), 
headerV1.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV1.getNum_values();
+          if (offsetIndex != null) {
+            long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, 
totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.getUncompressed_page_size(),
+              BytesInput.from(pageLoad),
+              statistics,
+              toIntWithCheck(rowCount),
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          } else {
+            writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
+              pageHeader.getUncompressed_page_size(),
+              BytesInput.from(pageLoad),
+              statistics,
+              converter.getEncoding(headerV1.getRepetition_level_encoding()),
+              converter.getEncoding(headerV1.getDefinition_level_encoding()),
+              converter.getEncoding(headerV1.getEncoding()));
+          }
+          pageIndex++;
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          BytesInput rlLevels = readBlockAllocate(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          BytesInput dlLevels = readBlockAllocate(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
+          int rawDataLength = pageHeader.getUncompressed_page_size() - 
rlLength - dlLength;
+          pageLoad = translatePageLoad(reader, headerV2.is_compressed, 
compressor, decompressor, payLoadLength, rawDataLength);
+          statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), 
headerV2.getStatistics(), columnIndex, pageIndex, converter);
+          readValues += headerV2.getNum_values();
+          writer.writeDataPageV2(headerV2.getNum_rows(),
+            headerV2.getNum_nulls(),
+            headerV2.getNum_values(),
+            rlLevels,
+            dlLevels,
+            converter.getEncoding(headerV2.getEncoding()),
+            BytesInput.from(pageLoad),
+            rawDataLength,
+            statistics);
+          pageIndex++;
+          break;
+        default:
+          LOG.debug("skipping page of type {} of size {}", 
pageHeader.getType(), compressedPageSize);
+          break;
+      }
+    }
+  }
+
+  private Statistics convertStatistics(String createdBy, PrimitiveType type, 
org.apache.parquet.format.Statistics pageStatistics,
+                                       ColumnIndex columnIndex, int pageIndex, 
ParquetMetadataConverter converter) throws IOException {
+    if (columnIndex != null) {
+      if (columnIndex.getNullPages() == null) {
+        throw new IOException("columnIndex has null variable 'nullPages' which 
indicates corrupted data for type: " +  type.getName());
+      }
+      if (pageIndex > columnIndex.getNullPages().size()) {
+        throw new IOException("There are more pages " + pageIndex + " found in 
the column than in the columnIndex " + columnIndex.getNullPages().size());
+      }
+      org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = 
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
+      statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
+
+      if (!columnIndex.getNullPages().get(pageIndex)) {
+        
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
+        
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
+      }
+      return statsBuilder.build();
+    } else if (pageStatistics != null) {
+      return converter.fromParquetStatistics(createdBy, pageStatistics, type);
+    } else {
+      return null;
+    }
+  }
+
+  private byte[] translatePageLoad(TransParquetFileReader reader, boolean 
isCompressed, CompressionCodecFactory.BytesInputCompressor compressor,
+                                   
CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, 
int rawDataLength) throws IOException {
+    BytesInput data = readBlock(payloadLength, reader);
+    if (isCompressed) {
+      data = decompressor.decompress(data, rawDataLength);
+    }
+    BytesInput newCompressedData = compressor.compress(data);
+    return newCompressedData.toByteArray();
+  }
+
+  public BytesInput readBlock(int length, TransParquetFileReader reader) 
throws IOException {
+    byte[] data;
+    if (length > pageBufferSize) {
+      data = new byte[length];
+    } else {
+      data = pageBuffer;
+    }
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  public BytesInput readBlockAllocate(int length, TransParquetFileReader 
reader) throws IOException {
+    byte[] data = new byte[length];
+    reader.blockRead(data, 0, length);
+    return BytesInput.from(data, 0, length);
+  }
+
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("size is bigger than " + 
Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
+  private static final class DummyGroupConverter extends GroupConverter {
+    @Override public void start() {}
+    @Override public void end() {}
+    @Override public Converter getConverter(int fieldIndex) { return new 
DummyConverter(); }
+  }
+
+  private static final class DummyConverter extends PrimitiveConverter {
+    @Override public GroupConverter asGroupConverter() { return new 
DummyGroupConverter(); }
+  }
+
+  public static final class TransParquetFileReader extends ParquetFileReader {
+
+    public TransParquetFileReader(InputFile file, ParquetReadOptions options) 
throws IOException {
+      super(file, options);
+    }
+
+    public void setStreamPosition(long newPos) throws IOException {
+      f.seek(newPos);
+    }
+
+    public void blockRead(byte[] data, int start, int len) throws IOException {
+      f.readFully(data, start, len);
+    }
+
+    public PageHeader readPageHeader() throws IOException {
+      return Util.readPageHeader(f);
+    }
+
+    public long getPos() throws IOException {
+      return f.getPos();
+    }
+  }
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
index 876a1f3..0f8cdbb 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -53,7 +53,7 @@ class H1SeekableInputStream extends 
DelegatingSeekableInputStream {
 
   @Override
   public void readFully(byte[] bytes, int start, int len) throws IOException {
-    stream.readFully(bytes);
+    stream.readFully(bytes, start, len);
   }
 
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
new file mode 100644
index 0000000..fefa5e4
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CompressionConveterTest {
+
+  private Configuration conf = new Configuration();
+  private Map<String, String> extraMeta
+    = ImmutableMap.of("key1", "value1", "key2", "value2");
+  private CompressionConverter compressionConverter = new 
CompressionConverter();
+  private Random rnd = new Random(5);
+
+  @Test
+  public void testTransCompression() throws Exception {
+    String[] codecs = {"UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD"};
+    for (int i = 0; i < codecs.length; i++) {
+      for (int j = 0; j <codecs.length; j++) {
+        // Same codec for both are considered as valid test case
+        testInternal(codecs[i], codecs[j], 
ParquetProperties.WriterVersion.PARQUET_1_0, 
ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], 
ParquetProperties.WriterVersion.PARQUET_2_0, 
ParquetProperties.DEFAULT_PAGE_SIZE);
+        testInternal(codecs[i], codecs[j], 
ParquetProperties.WriterVersion.PARQUET_1_0, 64);
+        testInternal(codecs[i], codecs[j], 
ParquetProperties.WriterVersion.PARQUET_1_0, 
ParquetProperties.DEFAULT_PAGE_SIZE * 100);
+      }
+    }
+  }
+
+  private void testInternal(String srcCodec, String destCodec, 
ParquetProperties.WriterVersion writerVersion, int pageSize) throws Exception {
+    int numRecord = 1000;
+    TestDocs testDocs = new TestDocs(numRecord);
+    String inputFile = createParquetFile(conf, extraMeta, numRecord, "input", 
srcCodec, writerVersion, pageSize, testDocs);
+    String outputFile = createTempFile("output_trans");
+
+    convertCompression(conf, inputFile, outputFile, destCodec);
+
+    validateColumns(outputFile, numRecord, testDocs);
+    validMeta(inputFile, outputFile);
+    validColumnIndex(inputFile, outputFile);
+  }
+
+  private void convertCompression(Configuration conf, String inputFile, String 
outputFile, String codec) throws IOException {
+    Path inPath = new Path(inputFile);
+    Path outPath = new Path(outputFile);
+    CompressionCodecName codecName = CompressionCodecName.valueOf(codec);
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, 
NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, 
ParquetFileWriter.Mode.CREATE);
+    writer.start();
+
+    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build())) {
+      compressionConverter.processBlocks(reader, writer, metaData, schema, 
metaData.getFileMetaData().getCreatedBy(), codecName);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+  }
+
+  private void validateColumns(String file, int numRecord, TestDocs testDocs) 
throws IOException {
+    ParquetReader<Group> reader = ParquetReader.builder(new 
GroupReadSupport(), new Path(file)).withConf(conf).build();
+    for (int i = 0; i < numRecord; i++) {
+      Group group = reader.read();
+      assertTrue(group.getLong("DocId", 0) == testDocs.docId[i]);
+      assertArrayEquals(group.getBinary("Name", 0).getBytes(), 
testDocs.name[i].getBytes());
+      assertArrayEquals(group.getBinary("Gender", 0).getBytes(), 
testDocs.gender[i].getBytes());
+      Group subGroup = group.getGroup("Links", 0);
+      assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), 
testDocs.linkBackward[i].getBytes());
+      assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), 
testDocs.linkForward[i].getBytes());
+    }
+    reader.close();
+  }
+
+  private void validMeta(String inputFile, String outFile) throws Exception {
+    ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new 
Path(inputFile), NO_FILTER);
+    ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new 
Path(outFile), NO_FILTER);
+    Assert.assertEquals(inMetaData.getFileMetaData().getSchema(), 
outMetaData.getFileMetaData().getSchema());
+    Assert.assertEquals(inMetaData.getFileMetaData().getKeyValueMetaData(), 
outMetaData.getFileMetaData().getKeyValueMetaData());
+  }
+
+  private void validColumnIndex(String inputFile, String outFile) throws 
Exception {
+    ParquetMetadata inMetaData = ParquetFileReader.readFooter(conf, new 
Path(inputFile), NO_FILTER);
+    ParquetMetadata outMetaData = ParquetFileReader.readFooter(conf, new 
Path(outFile), NO_FILTER);
+    Assert.assertEquals(inMetaData.getBlocks().size(), 
outMetaData.getBlocks().size());
+    try (TransParquetFileReader inReader = new 
TransParquetFileReader(HadoopInputFile.fromPath(new Path(inputFile), conf), 
HadoopReadOptions.builder(conf).build());
+         TransParquetFileReader outReader = new 
TransParquetFileReader(HadoopInputFile.fromPath(new Path(outFile), conf), 
HadoopReadOptions.builder(conf).build())) {
+      for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
+        BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
+        BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+        Assert.assertEquals(inBlockMetaData.getColumns().size(), 
outBlockMetaData.getColumns().size());
+        for (int j = 0; j < inBlockMetaData.getColumns().size(); j++) {
+          ColumnChunkMetaData inChunk = inBlockMetaData.getColumns().get(j);
+          ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+          OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+          ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
+          ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+          OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+          if (inColumnIndex != null) {
+            Assert.assertEquals(inColumnIndex.getBoundaryOrder(), 
outColumnIndex.getBoundaryOrder());
+            Assert.assertEquals(inColumnIndex.getMaxValues(), 
outColumnIndex.getMaxValues());
+            Assert.assertEquals(inColumnIndex.getMinValues(), 
outColumnIndex.getMinValues());
+            Assert.assertEquals(inColumnIndex.getNullCounts(), 
outColumnIndex.getNullCounts());
+          }
+          if (inOffsetIndex != null) {
+            List<Long> inOffsets = getOffsets(inReader, inChunk);
+            List<Long> outOffsets = getOffsets(outReader, outChunk);
+            Assert.assertEquals(inOffsets.size(), outOffsets.size());
+            Assert.assertEquals(inOffsets.size(), 
inOffsetIndex.getPageCount());
+            Assert.assertEquals(inOffsetIndex.getPageCount(), 
outOffsetIndex.getPageCount());
+            for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+              Assert.assertEquals(inOffsetIndex.getFirstRowIndex(k), 
outOffsetIndex.getFirstRowIndex(k));
+              Assert.assertEquals(inOffsetIndex.getLastRowIndex(k, 
inChunk.getValueCount()),
+                outOffsetIndex.getLastRowIndex(k, outChunk.getValueCount()));
+              Assert.assertEquals(inOffsetIndex.getOffset(k), 
(long)inOffsets.get(k));
+              Assert.assertEquals(outOffsetIndex.getOffset(k), 
(long)outOffsets.get(k));
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private List<Long> getOffsets(TransParquetFileReader reader, 
ColumnChunkMetaData chunk) throws IOException {
+    List<Long> offsets = new ArrayList<>();
+    reader.setStreamPosition(chunk.getStartingPos());
+    long readValues = 0;
+    long totalChunkValues = chunk.getValueCount();
+    while (readValues < totalChunkValues) {
+      long curOffset = reader.getPos();
+      PageHeader pageHeader = reader.readPageHeader();
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          compressionConverter.readBlock(pageHeader.getCompressed_page_size(), 
reader);
+          break;
+        case DATA_PAGE:
+          DataPageHeader headerV1 = pageHeader.data_page_header;
+          offsets.add(curOffset);
+          compressionConverter.readBlock(pageHeader.getCompressed_page_size(), 
reader);
+          readValues += headerV1.getNum_values();
+          break;
+        case DATA_PAGE_V2:
+          DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
+          offsets.add(curOffset);
+          int rlLength = headerV2.getRepetition_levels_byte_length();
+          compressionConverter.readBlock(rlLength, reader);
+          int dlLength = headerV2.getDefinition_levels_byte_length();
+          compressionConverter.readBlock(dlLength, reader);
+          int payLoadLength = pageHeader.getCompressed_page_size() - rlLength 
- dlLength;
+          compressionConverter.readBlock(payLoadLength, reader);
+          readValues += headerV2.getNum_values();
+          break;
+        default:
+          throw new IOException("Not recognized page type");
+      }
+    }
+    return offsets;
+  }
+
+  private String createParquetFile(Configuration conf, Map<String, String> 
extraMeta, int numRecord, String prefix, String codec,
+                                         ParquetProperties.WriterVersion 
writerVersion, int pageSize, TestDocs testDocs) throws IOException {
+    MessageType schema = new MessageType("schema",
+      new PrimitiveType(REQUIRED, INT64, "DocId"),
+      new PrimitiveType(REQUIRED, BINARY, "Name"),
+      new PrimitiveType(REQUIRED, BINARY, "Gender"),
+      new GroupType(OPTIONAL, "Links",
+        new PrimitiveType(REPEATED, BINARY, "Backward"),
+        new PrimitiveType(REPEATED, BINARY, "Forward")));
+
+    conf.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString());
+
+    String file = createTempFile(prefix);
+    ExampleParquetWriter.Builder builder = ExampleParquetWriter.builder(new 
Path(file))
+      .withConf(conf)
+      .withWriterVersion(writerVersion)
+      .withExtraMetaData(extraMeta)
+      .withDictionaryEncoding("DocId", true)
+      .withValidation(true)
+      .enablePageWriteChecksum()
+      .withPageSize(pageSize)
+      .withCompressionCodec(CompressionCodecName.valueOf(codec));
+    try (ParquetWriter writer = builder.build()) {
+      for (int i = 0; i < numRecord; i++) {
+        SimpleGroup g = new SimpleGroup(schema);
+        g.add("DocId", testDocs.docId[i]);
+        g.add("Name", testDocs.name[i]);
+        g.add("Gender", testDocs.gender[i]);
+        Group links = g.addGroup("Links");
+        links.add(0, testDocs.linkBackward[i]);
+        links.add(1, testDocs.linkForward[i]);
+        writer.write(g);
+      }
+    }
+
+    return file;
+  }
+
+  private static long getLong() {
+    return ThreadLocalRandom.current().nextLong(1000);
+  }
+
+  private String getString() {
+    char[] chars = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'x', 'z', 'y'};
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < 100; i++) {
+      sb.append(chars[rnd.nextInt(10)]);
+    }
+    return sb.toString();
+  }
+
+  private String createTempFile(String prefix) {
+    try {
+      return Files.createTempDirectory(prefix).toAbsolutePath().toString() + 
"/test.parquet";
+    } catch (IOException e) {
+      throw new AssertionError("Unable to create temporary file", e);
+    }
+  }
+
+  private  class TestDocs {
+    public long[] docId;
+    public String[] name;
+    public String[] gender;
+    public String[] linkBackward;
+    public String[] linkForward;
+
+    public TestDocs(int numRecord) {
+      docId = new long[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        docId[i] = getLong();
+      }
+
+      name = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        name[i] = getString();
+      }
+
+      gender = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        gender[i] = getString();
+      }
+
+      linkBackward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkBackward[i] = getString();
+      }
+
+      linkForward = new String[numRecord];
+      for (int i = 0; i < numRecord; i++) {
+        linkForward[i] = getString();
+      }
+    }
+  }
+}
diff --git 
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java
 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java
new file mode 100644
index 0000000..1348a63
--- /dev/null
+++ 
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.tools.command;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import 
org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+
+import static 
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+
+public class TransCompressionCommand extends ArgsOnlyCommand {
+
+  public static final String[] USAGE = new String[] {
+    "<input> <output> <codec_name>",
+
+    "where <input> is the source parquet file",
+    "    <output> is the destination parquet file," +
+    "    <new_codec_name> is the codec name in the case sensitive format to be 
translated to, e.g. SNAPPY, GZIP, ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED"
+  };
+
+  private Configuration conf;
+  private CompressionConverter compressionConverter;
+
+  public TransCompressionCommand() {
+    super(3, 3);
+    this.conf = new Configuration();
+    compressionConverter = new CompressionConverter();
+  }
+
+  public TransCompressionCommand(Configuration conf) {
+    super(3, 3);
+    this.conf = conf;
+    compressionConverter = new CompressionConverter();
+  }
+
+  @Override
+  public String[] getUsageDescription() {
+    return USAGE;
+  }
+
+  @Override
+  public String getCommandDescription() {
+    return "Translate the compression of a given Parquet file to a new 
compression one to a new Parquet file.";
+  }
+
+  @Override
+  public void execute(CommandLine options) throws Exception {
+    super.execute(options);
+    List<String> args = options.getArgList();
+    Path inPath = new Path(args.get(0));
+    Path outPath = new Path(args.get(1));
+    CompressionCodecName codecName = CompressionCodecName.valueOf(args.get(2));
+
+    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, 
NO_FILTER);
+    MessageType schema = metaData.getFileMetaData().getSchema();
+    ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, 
ParquetFileWriter.Mode.CREATE);
+    writer.start();
+
+    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build())) {
+      compressionConverter.processBlocks(reader, writer, metaData, schema, 
metaData.getFileMetaData().getCreatedBy(), codecName);
+    } finally {
+      writer.end(metaData.getFileMetaData().getKeyValueMetaData());
+    }
+  }
+}

Reply via email to