This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new ab42fe5 Revert "PARQUET-1381: Add merge blocks command to
parquet-tools (#512)" (#621)
ab42fe5 is described below
commit ab42fe5180366120336fb3f8b9e6540aadb5da1b
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Mon Feb 25 13:42:46 2019 +0100
Revert "PARQUET-1381: Add merge blocks command to parquet-tools (#512)"
(#621)
This reverts commit 863a081850e56bbbb38d7b68b478a3bd40779723.
The design of this feature has conceptional problems and also works
incorrectly. See PARQUET-1381 for more details.
---
.../parquet/column/impl/ColumnReadStoreImpl.java | 2 +-
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 5 -
.../apache/parquet/hadoop/ParquetFileReader.java | 92 ++-----
.../apache/parquet/hadoop/ParquetFileWriter.java | 123 ---------
.../apache/parquet/hadoop/util/BlocksCombiner.java | 106 --------
.../hadoop/TestParquetWriterMergeBlocks.java | 280 ---------------------
.../apache/parquet/tools/command/MergeCommand.java | 75 +-----
7 files changed, 24 insertions(+), 659 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index b7e1597..755985d 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -85,7 +85,7 @@ public class ColumnReadStoreImpl implements ColumnReadStore {
}
}
- public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader
pageReader) {
+ private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path,
PageReader pageReader) {
PrimitiveConverter converter = getPrimitiveConverter(path);
return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f87630b..f85d374 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -292,9 +292,4 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
}
}
- void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer)
throws IOException {
- ColumnChunkPageWriter pageWriter = writers.get(path);
- pageWriter.writeToFileWriter(writer);
- }
-
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 01867c6..8e205f6 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -42,7 +42,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -67,7 +66,6 @@ import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
-import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.PageReadStore;
import
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -1408,7 +1406,27 @@ public class ParquetFileReader implements Closeable {
* @throws IOException if there is an error while reading from the stream
*/
public void readAll(SeekableInputStream f, ChunkListBuilder builder)
throws IOException {
- List<ByteBuffer> buffers = readBlocks(f, offset, length);
+ List<Chunk> result = new ArrayList<Chunk>(chunks.size());
+ f.seek(offset);
+
+ int fullAllocations = length / options.getMaxAllocationSize();
+ int lastAllocationSize = length % options.getMaxAllocationSize();
+
+ int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
+ List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
+
+ for (int i = 0; i < fullAllocations; i += 1) {
+
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
+ }
+
+ if (lastAllocationSize > 0) {
+ buffers.add(options.getAllocator().allocate(lastAllocationSize));
+ }
+
+ for (ByteBuffer buffer : buffers) {
+ f.readFully(buffer);
+ buffer.flip();
+ }
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
@@ -1428,72 +1446,4 @@ public class ParquetFileReader implements Closeable {
}
- /**
- * @param f file to read the blocks from
- * @return the ByteBuffer blocks
- * @throws IOException if there is an error while reading from the stream
- */
- List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length)
throws IOException {
- f.seek(offset);
-
- int fullAllocations = length / options.getMaxAllocationSize();
- int lastAllocationSize = length % options.getMaxAllocationSize();
-
- int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
- List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
-
- for (int i = 0; i < fullAllocations; i++) {
-
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
- }
-
- if (lastAllocationSize > 0) {
- buffers.add(options.getAllocator().allocate(lastAllocationSize));
- }
-
- for (ByteBuffer buffer : buffers) {
- f.readFully(buffer);
- buffer.flip();
- }
- return buffers;
- }
-
- Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor
columnDescriptor) {
- BlockMetaData block = blocks.get(blockIndex);
- if (block.getRowCount() == 0) {
- throw new RuntimeException("Illegal row group of 0 rows");
- }
- Optional<ColumnChunkMetaData> mc = findColumnByPath(block,
columnDescriptor.getPath());
-
- return mc.map(column -> new ChunkDescriptor(columnDescriptor, column,
column.getStartingPos(), (int) column.getTotalSize()))
- .map(chunk -> readChunk(f, chunk));
- }
-
- private ColumnChunkPageReader readChunk(SeekableInputStream f,
ChunkDescriptor descriptor) {
- try {
- List<ByteBuffer> buffers = readBlocks(f, descriptor.fileOffset,
descriptor.size);
- ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers);
- Chunk chunk = new WorkaroundChunk(descriptor,
stream.sliceBuffers(descriptor.size), f, null);
- return chunk.readAllPages();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block,
String[] path) {
- for (ColumnChunkMetaData column : block.getColumns()) {
- if (Arrays.equals(column.getPath().toArray(), path)) {
- return Optional.of(column);
- }
- }
- return Optional.empty();
- }
-
- public int blocksCount() {
- return blocks.size();
- }
-
- public BlockMetaData getBlockMetaData(int blockIndex) {
- return blocks.get(blockIndex);
- }
-
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 14e3729..c875702 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -26,15 +26,12 @@ import static
org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -45,23 +42,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
import org.apache.parquet.Strings;
import org.apache.parquet.Version;
-import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.ColumnReader;
-import org.apache.parquet.column.ColumnWriteStore;
-import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.impl.ColumnReadStoreImpl;
-import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.Util;
@@ -72,7 +60,6 @@ import
org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.BlocksCombiner;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
@@ -669,116 +656,6 @@ public class ParquetFileWriter {
}
}
- public int merge(List<InputFile> inputFiles, CodecFactory.BytesCompressor
compressor, String createdBy, long maxBlockSize) throws IOException {
- List<ParquetFileReader> readers = getReaders(inputFiles);
- try {
- ByteBufferAllocator allocator = new HeapByteBufferAllocator();
- ColumnReadStoreImpl columnReadStore = new ColumnReadStoreImpl(null, new
DummyRecordConverter(schema).getRootConverter(), schema, createdBy);
- this.start();
- List<BlocksCombiner.SmallBlocksUnion> largeBlocks =
BlocksCombiner.combineLargeBlocks(readers, maxBlockSize);
- for (BlocksCombiner.SmallBlocksUnion smallBlocks : largeBlocks) {
- for (int columnIndex = 0; columnIndex < schema.getColumns().size();
columnIndex++) {
- ColumnDescriptor path = schema.getColumns().get(columnIndex);
- ColumnChunkPageWriteStore store = new
ColumnChunkPageWriteStore(compressor, schema, allocator,
ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
- ColumnWriteStoreV1 columnWriteStoreV1 = new
ColumnWriteStoreV1(schema, store, ParquetProperties.builder().build());
- for (BlocksCombiner.SmallBlock smallBlock : smallBlocks.getBlocks())
{
- ParquetFileReader parquetFileReader = smallBlock.getReader();
- try {
- Optional<PageReader> columnChunkPageReader =
parquetFileReader.readColumnInBlock(smallBlock.getBlockIndex(), path);
- ColumnWriter columnWriter =
columnWriteStoreV1.getColumnWriter(path);
- if (columnChunkPageReader.isPresent()) {
- ColumnReader columnReader =
columnReadStore.newMemColumnReader(path, columnChunkPageReader.get());
- for (int i = 0; i < columnReader.getTotalValueCount(); i++) {
- consumeTriplet(columnWriteStoreV1, columnWriter,
columnReader);
- }
- } else {
- MessageType inputFileSchema =
parquetFileReader.getFileMetaData().getSchema();
- String[] parentPath = getExisingParentPath(path,
inputFileSchema);
- int def =
parquetFileReader.getFileMetaData().getSchema().getMaxDefinitionLevel(parentPath);
- int rep =
parquetFileReader.getFileMetaData().getSchema().getMaxRepetitionLevel(parentPath);
- for (int i = 0; i <
parquetFileReader.getBlockMetaData(smallBlock.getBlockIndex()).getRowCount();
i++) {
- columnWriter.writeNull(rep, def);
- if (def == 0) {
- // V1 pages also respect record boundaries so we have to
mark them
- columnWriteStoreV1.endRecord();
- }
- }
- }
- } catch (Exception e) {
- LOG.error("File {} is not readable",
parquetFileReader.getFile(), e);
- }
- }
- if (columnIndex == 0) {
- this.startBlock(smallBlocks.getRowCount());
- }
- columnWriteStoreV1.flush();
- store.flushToFileWriter(path, this);
- }
- this.endBlock();
- }
- this.end(Collections.emptyMap());
- }finally {
- BlocksCombiner.closeReaders(readers);
- }
- return 0;
- }
-
- private String[] getExisingParentPath(ColumnDescriptor path, MessageType
inputFileSchema) {
- List<String> parentPath = Arrays.asList(path.getPath());
- while (parentPath.size() > 0 &&
!inputFileSchema.containsPath(parentPath.toArray(new
String[parentPath.size()]))) {
- parentPath = parentPath.subList(0, parentPath.size() - 1);
- }
- return parentPath.toArray(new String[parentPath.size()]);
- }
-
- private List<ParquetFileReader> getReaders(List<InputFile> inputFiles)
throws IOException {
- List<ParquetFileReader> readers = new ArrayList<>(inputFiles.size());
- for (InputFile inputFile : inputFiles) {
- readers.add(ParquetFileReader.open(inputFile));
- }
- return readers;
- }
-
- private void consumeTriplet(ColumnWriteStore columnWriteStore, ColumnWriter
columnWriter, ColumnReader columnReader) {
- int definitionLevel = columnReader.getCurrentDefinitionLevel();
- int repetitionLevel = columnReader.getCurrentRepetitionLevel();
- ColumnDescriptor column = columnReader.getDescriptor();
- PrimitiveType type = column.getPrimitiveType();
- if (definitionLevel < column.getMaxDefinitionLevel()) {
- columnWriter.writeNull(repetitionLevel, definitionLevel);
- } else {
- switch (type.getPrimitiveTypeName()) {
- case INT32:
- columnWriter.write(columnReader.getInteger(), repetitionLevel,
definitionLevel);
- break;
- case INT64:
- columnWriter.write(columnReader.getLong(), repetitionLevel,
definitionLevel);
- break;
- case BINARY:
- case FIXED_LEN_BYTE_ARRAY:
- case INT96:
- columnWriter.write(columnReader.getBinary(), repetitionLevel,
definitionLevel);
- break;
- case BOOLEAN:
- columnWriter.write(columnReader.getBoolean(), repetitionLevel,
definitionLevel);
- break;
- case FLOAT:
- columnWriter.write(columnReader.getFloat(), repetitionLevel,
definitionLevel);
- break;
- case DOUBLE:
- columnWriter.write(columnReader.getDouble(), repetitionLevel,
definitionLevel);
- break;
- default:
- throw new IllegalArgumentException("Unknown primitive type " + type);
- }
- }
- columnReader.consume();
- if (repetitionLevel == 0) {
- // V1 pages also respect record boundaries so we have to mark them
- columnWriteStore.endRecord();
- }
- }
-
/**
* @param file a file stream to read from
* @param rowGroups row groups to copy
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
deleted file mode 100644
index 02dadc7..0000000
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/BlocksCombiner.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.util;
-
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.util.Collections.unmodifiableList;
-
-public class BlocksCombiner {
-
- private static final Logger LOG =
LoggerFactory.getLogger(BlocksCombiner.class);
-
- public static List<SmallBlocksUnion>
combineLargeBlocks(List<ParquetFileReader> readers, long maxBlockSize) {
- List<SmallBlocksUnion> blocks = new ArrayList<>();
- long largeBlockSize = 0;
- long largeBlockRecords = 0;
- List<SmallBlock> smallBlocks = new ArrayList<>();
- for (ParquetFileReader reader : readers) {
- for (int blockIndex = 0; blockIndex < reader.blocksCount();
blockIndex++) {
- BlockMetaData block = reader.getBlockMetaData(blockIndex);
- if (!smallBlocks.isEmpty() && largeBlockSize +
block.getTotalByteSize() > maxBlockSize) {
- blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
- smallBlocks = new ArrayList<>();
- largeBlockSize = 0;
- largeBlockRecords = 0;
- }
- largeBlockSize += block.getTotalByteSize();
- largeBlockRecords += block.getRowCount();
- smallBlocks.add(new SmallBlock(reader, blockIndex));
- }
- }
- if (!smallBlocks.isEmpty()) {
- blocks.add(new SmallBlocksUnion(smallBlocks, largeBlockRecords));
- }
- return unmodifiableList(blocks);
- }
-
- public static void closeReaders(List<ParquetFileReader> readers) {
- readers.forEach(r -> {
- try {
- r.close();
- } catch (IOException e) {
- LOG.error("Error closing reader {}", r.getFile(), e);
- }
- });
- }
-
- public static class SmallBlocksUnion {
- private final List<SmallBlock> blocks;
- private final long rowCount;
-
- public SmallBlocksUnion(List<SmallBlock> blocks, long rowCount) {
- this.blocks = blocks;
- this.rowCount = rowCount;
- }
-
- public List<SmallBlock> getBlocks() {
- return blocks;
- }
-
- public long getRowCount() {
- return rowCount;
- }
- }
-
- public static class SmallBlock {
- private final ParquetFileReader reader;
- private final int blockIndex;
-
- public SmallBlock(ParquetFileReader reader, int blockIndex) {
- this.reader = reader;
- this.blockIndex = blockIndex;
- }
-
- public ParquetFileReader getReader() {
- return reader;
- }
-
- public int getBlockIndex() {
- return blockIndex;
- }
- }
-}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
deleted file mode 100644
index a972238..0000000
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterMergeBlocks.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static java.util.Arrays.asList;
-import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
-import static org.apache.parquet.schema.OriginalType.UTF8;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
-import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
-
-public class TestParquetWriterMergeBlocks {
-
- @Rule
- public TemporaryFolder temp = new TemporaryFolder();
-
- public static final int FILE_SIZE = 10000;
- public static final Configuration CONF = new Configuration();
- public static final Map<String, String> EMPTY_METADATA =
- new HashMap<String, String>();
- public static final MessageType FILE_SCHEMA = Types.buildMessage()
- .required(INT32).named("id")
- .required(BINARY).as(UTF8).named("string")
- .named("AppendTest");
- public static final SimpleGroupFactory GROUP_FACTORY =
- new SimpleGroupFactory(FILE_SCHEMA);
-
- public Path file1;
- public List<Group> file1content = new ArrayList<Group>();
- public Path file2;
- public List<Group> file2content = new ArrayList<Group>();
-
- @Before
- public void createSourceData() throws IOException {
- this.file1 = newTemp();
- this.file2 = newTemp();
-
- ParquetWriter<Group> writer1 = ExampleParquetWriter.builder(file1)
- .withType(FILE_SCHEMA)
- .build();
- ParquetWriter<Group> writer2 = ExampleParquetWriter.builder(file2)
- .withType(FILE_SCHEMA)
- .build();
-
- for (int i = 0; i < FILE_SIZE; i += 1) {
- Group group1 = GROUP_FACTORY.newGroup();
- group1.add("id", i);
- group1.add("string", UUID.randomUUID().toString());
- writer1.write(group1);
- file1content.add(group1);
-
- Group group2 = GROUP_FACTORY.newGroup();
- group2.add("id", FILE_SIZE+i);
- group2.add("string", UUID.randomUUID().toString());
- writer2.write(group2);
- file2content.add(group2);
- }
-
- writer1.close();
- writer2.close();
- }
-
- @Test
- public void testBasicBehavior() throws IOException {
- Path combinedFile = newTemp();
- ParquetFileWriter writer = new ParquetFileWriter(
- CONF, FILE_SCHEMA, combinedFile);
-
- // Merge schema and extraMeta
- List<Path> inputFiles = asList(file1, file2);
- FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles,
CONF).getFileMetaData();
- List<InputFile> inputFileList = toInputFiles(inputFiles);
- CodecFactory.BytesCompressor compressor = new CodecFactory(CONF,
DEFAULT_PAGE_SIZE).getCompressor(CompressionCodecName.SNAPPY);
-
- writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 *
1024 * 1024);
-
- LinkedList<Group> expected = new LinkedList<>();
- expected.addAll(file1content);
- expected.addAll(file2content);
-
- ParquetReader<Group> reader = ParquetReader
- .builder(new GroupReadSupport(), combinedFile)
- .build();
-
- Group next;
- while ((next = reader.read()) != null) {
- Group expectedNext = expected.removeFirst();
- // check each value; equals is not supported for simple records
- Assert.assertEquals("Each id should match",
- expectedNext.getInteger("id", 0), next.getInteger("id", 0));
- Assert.assertEquals("Each string should match",
- expectedNext.getString("string", 0), next.getString("string", 0));
- }
-
- Assert.assertEquals("All records should be present", 0, expected.size());
- }
-
- private List<InputFile> toInputFiles(List<Path> inputFiles) {
- return inputFiles.stream()
- .map(input -> {
- try {
- return HadoopInputFile.fromPath(input, CONF);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- }
-
- @Test
- public void testMergedMetadata() throws IOException {
- Path combinedFile = newTemp();
- ParquetFileWriter writer = new ParquetFileWriter(
- CONF, FILE_SCHEMA, combinedFile);
-
- // Merge schema and extraMeta
- List<Path> inputFiles = asList(file1, file2);
- FileMetaData mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles,
CONF).getFileMetaData();
- List<InputFile> inputFileList = toInputFiles(inputFiles);
- CompressionCodecName codecName = CompressionCodecName.GZIP;
- CodecFactory.BytesCompressor compressor = new CodecFactory(CONF,
DEFAULT_PAGE_SIZE).getCompressor(codecName);
- writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(), 128 *
1024 * 1024);
-
- ParquetMetadata combinedFooter = ParquetFileReader.readFooter(
- CONF, combinedFile, NO_FILTER);
- ParquetMetadata f1Footer = ParquetFileReader.readFooter(
- CONF, file1, NO_FILTER);
- ParquetMetadata f2Footer = ParquetFileReader.readFooter(
- CONF, file2, NO_FILTER);
-
- LinkedList<BlockMetaData> expectedRowGroups = new LinkedList<>();
- expectedRowGroups.addAll(f1Footer.getBlocks());
- expectedRowGroups.addAll(f2Footer.getBlocks());
- long totalRowCount =
expectedRowGroups.stream().mapToLong(BlockMetaData::getRowCount).sum();
- Assert.assertEquals("Combined should have a single row group",
- 1,
- combinedFooter.getBlocks().size());
-
- BlockMetaData rowGroup = combinedFooter.getBlocks().get(0);
- Assert.assertEquals("Row count should match",
- totalRowCount, rowGroup.getRowCount());
- assertColumnsEquivalent(f1Footer.getBlocks().get(0).getColumns(),
rowGroup.getColumns(), codecName);
- }
-
- public void assertColumnsEquivalent(List<ColumnChunkMetaData> expected,
- List<ColumnChunkMetaData> actual,
- CompressionCodecName codecName) {
- Assert.assertEquals("Should have the expected columns",
- expected.size(), actual.size());
- for (int i = 0; i < actual.size(); i += 1) {
- long numNulls = 0;
- long valueCount = 0;
- ColumnChunkMetaData current = actual.get(i);
- Statistics statistics = current.getStatistics();
- numNulls += statistics.getNumNulls();
- valueCount += current.getValueCount();
- if (i != 0) {
- ColumnChunkMetaData previous = actual.get(i - 1);
- long expectedStart = previous.getStartingPos() +
previous.getTotalSize();
- Assert.assertEquals("Should start after the previous column",
- expectedStart, current.getStartingPos());
- }
-
- assertColumnMetadataEquivalent(expected.get(i), current, codecName,
numNulls, valueCount);
- }
- }
-
- public void assertColumnMetadataEquivalent(ColumnChunkMetaData expected,
- ColumnChunkMetaData actual,
- CompressionCodecName codecName,
- long numNulls,
- long valueCount) {
- Assert.assertEquals("Should be the expected column",
- expected.getPath(), expected.getPath());
- Assert.assertEquals("Primitive type should not change",
- expected.getType(), actual.getType());
- Assert.assertEquals("Compression codec should not change",
- codecName, actual.getCodec());
- Assert.assertEquals("Data encodings should not change",
- expected.getEncodings(), actual.getEncodings());
- Assert.assertEquals("Statistics should not change",
- numNulls, actual.getStatistics().getNumNulls());
- Assert.assertEquals("Number of values should not change",
- valueCount, actual.getValueCount());
-
- }
-
- @Test
- public void testAllowDroppingColumns() throws IOException {
- MessageType droppedColumnSchema = Types.buildMessage()
- .required(BINARY).as(UTF8).named("string")
- .named("AppendTest");
-
- Path droppedColumnFile = newTemp();
- List<Path> inputFiles = asList(file1, file2);
- ParquetFileWriter writer = new ParquetFileWriter(
- CONF, droppedColumnSchema, droppedColumnFile);
- List<InputFile> inputFileList = toInputFiles(inputFiles);
- CompressionCodecName codecName = CompressionCodecName.GZIP;
- CodecFactory.BytesCompressor compressor = new CodecFactory(CONF,
DEFAULT_PAGE_SIZE).getCompressor(codecName);
- writer.merge(inputFileList, compressor, "", 128*1024*1024);
-
- LinkedList<Group> expected = new LinkedList<Group>();
- expected.addAll(file1content);
- expected.addAll(file2content);
-
- ParquetMetadata footer = ParquetFileReader.readFooter(
- CONF, droppedColumnFile, NO_FILTER);
- for (BlockMetaData rowGroup : footer.getBlocks()) {
- Assert.assertEquals("Should have only the string column",
- 1, rowGroup.getColumns().size());
- }
-
- ParquetReader<Group> reader = ParquetReader
- .builder(new GroupReadSupport(), droppedColumnFile)
- .build();
-
- Group next;
- while ((next = reader.read()) != null) {
- Group expectedNext = expected.removeFirst();
- Assert.assertEquals("Each string should match",
- expectedNext.getString("string", 0), next.getString("string", 0));
- }
-
- Assert.assertEquals("All records should be present", 0, expected.size());
- }
-
- private Path newTemp() throws IOException {
- File file = temp.newFile();
- Preconditions.checkArgument(file.delete(), "Could not remove temp file");
- return new Path(file.toString());
- }
-}
diff --git
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
index 6d5b313..fe64587 100644
---
a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
+++
b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
@@ -19,29 +19,20 @@
package org.apache.parquet.tools.command;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.CodecFactory;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.io.InputFile;
import org.apache.parquet.tools.Main;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
public class MergeCommand extends ArgsOnlyCommand {
public static final String[] USAGE = new String[] {
@@ -58,32 +49,6 @@ public class MergeCommand extends ArgsOnlyCommand {
private Configuration conf;
- private static final Options OPTIONS;
- static {
- OPTIONS = new Options();
-
- Option block = Option.builder("b")
- .longOpt("block")
- .desc("Merge adjacent blocks into one up to upper bound size limit
default to 128 MB")
- .build();
-
- Option limit = Option.builder("l")
- .longOpt("limit")
- .desc("Upper bound for merged block size in megabytes. Default: 128 MB")
- .hasArg()
- .build();
-
- Option codec = Option.builder("c")
- .longOpt("codec")
- .desc("Compression codec name. Default: SNAPPY. Valid values:
UNCOMPRESSED, SNAPPY, GZIP, LZO, BROTLI, LZ4, ZSTD")
- .hasArg()
- .build();
-
- OPTIONS.addOption(limit);
- OPTIONS.addOption(block);
- OPTIONS.addOption(codec);
- }
-
public MergeCommand() {
super(2, MAX_FILE_NUM + 1);
@@ -91,11 +56,6 @@ public class MergeCommand extends ArgsOnlyCommand {
}
@Override
- public Options getOptions() {
- return OPTIONS;
- }
-
- @Override
public String[] getUsageDescription() {
return USAGE;
}
@@ -103,32 +63,18 @@ public class MergeCommand extends ArgsOnlyCommand {
@Override
public String getCommandDescription() {
return "Merges multiple Parquet files into one. " +
- "Without -b option the command doesn't merge row groups, just places one
after the other. " +
+ "The command doesn't merge row groups, just places one after the other.
" +
"When used to merge many small files, the resulting file will still
contain small row groups, " +
- "which usually leads to bad query performance. " +
- "To have adjacent small blocks merged together use -b option. " +
- "Blocks will be grouped into larger one until the upper bound is
reached. " +
- "Default block upper bound 128 MB and default compression SNAPPY can be
customized using -l and -c options";
+ "which usually leads to bad query performance.";
}
@Override
public void execute(CommandLine options) throws Exception {
- boolean mergeBlocks = options.hasOption('b');
- int maxBlockSize = options.hasOption('l')?
Integer.parseInt(options.getOptionValue('l')) * 1024 * 1024 :
DEFAULT_BLOCK_SIZE;
- CompressionCodecName compressionCodec = options.hasOption('c') ?
CompressionCodecName.valueOf(options.getOptionValue('c')) :
CompressionCodecName.SNAPPY;
// Prepare arguments
List<String> args = options.getArgList();
List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
Path outputFile = new Path(args.get(args.size() - 1));
- if (mergeBlocks) {
- CodecFactory.BytesCompressor compressor = new CodecFactory(conf,
DEFAULT_PAGE_SIZE).getCompressor(compressionCodec);
- mergeBlocks(maxBlockSize, compressor, inputFiles, outputFile);
- } else {
- mergeFiles(inputFiles, outputFile);
- }
- }
- private void mergeFiles(List<Path> inputFiles, Path outputFile) throws
IOException {
// Merge schema and extraMeta
FileMetaData mergedMeta = mergedMetadata(inputFiles);
PrintWriter out = new PrintWriter(Main.out, true);
@@ -157,23 +103,6 @@ public class MergeCommand extends ArgsOnlyCommand {
writer.end(mergedMeta.getKeyValueMetaData());
}
- private void mergeBlocks(int maxBlockSize, CodecFactory.BytesCompressor
compressor, List<Path> inputFiles, Path outputFile) throws IOException {
- // Merge schema and extraMeta
- FileMetaData mergedMeta = mergedMetadata(inputFiles);
-
- // Merge data
- ParquetFileWriter writer = new ParquetFileWriter(conf,
mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
- List<InputFile> inputFileList = inputFiles.stream()
- .map(input -> {
- try {
- return HadoopInputFile.fromPath(input, conf);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- writer.merge(inputFileList, compressor, mergedMeta.getCreatedBy(),
maxBlockSize);
- }
-
private FileMetaData mergedMetadata(List<Path> inputFiles) throws
IOException {
return ParquetFileWriter.mergeMetadataFiles(inputFiles,
conf).getFileMetaData();
}