This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ba0c0e01853 Add Parquet output format to segment converter tool
(#17990)
ba0c0e01853 is described below
commit ba0c0e0185309337cb28f4f7d98af58bfb174d12
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Mar 27 01:07:58 2026 -0700
Add Parquet output format to segment converter tool (#17990)
* Add Parquet output format support to PinotSegmentConvertCommand
Add PinotSegmentToParquetConverter that converts Pinot segments to
Parquet format using AvroParquetWriter with GZIP compression. This
extends the existing segment converter tool to support PARQUET as
an output format alongside AVRO, CSV, and JSON.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Add configurable compression codec for Parquet output
Add -parquetCompression CLI option supporting GZIP, SNAPPY, ZSTD,
LZ4, and UNCOMPRESSED codecs (defaults to GZIP). The compression
codec is passed through to the AvroParquetWriter.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Add forward-index-only loading mode for segment converter
Add -forwardIndexOnly CLI option that loads only forward index,
dictionary, and null value vector when reading segments. This skips
secondary indexes (inverted, range, bloom, text, etc.) for faster
loading and better tolerance of segments with missing indexes.
Changes:
- IndexLoadingConfig: add forwardIndexOnly flag
- PhysicalColumnIndexContainer: skip non-essential indexes when flag set
- ImmutableSegmentLoader: add load overload with forwardIndexOnly param
- PinotSegmentRecordReader: add init overload with forwardIndexOnly param
- PinotSegmentToParquetConverter: pass through forwardIndexOnly option
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Address review: reuse utility, typed enum option, add Parquet test
- Use SegmentProcessorAvroUtils.convertGenericRowToAvroRecord() instead
of duplicating GenericRow to Avro Record conversion logic
- Cache ParquetUtils.getParquetHadoopConfiguration() in a local variable
to avoid repeated allocation
- Change -parquetCompression CLI option from String to CompressionCodecName
for picocli validation of enum values
- Add testParquetConverter to PinotSegmentConverterTest that verifies
SV/MV and BYTES column handling via round-trip conversion
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Address review: clarify forwardIndexOnly scope, add reader test
- Update -forwardIndexOnly description to clarify it only applies to
PARQUET output format and is ignored for AVRO/CSV/JSON
- Add testPinotSegmentRecordReaderForwardIndexOnly to verify that
PinotSegmentRecordReader reads all rows correctly when loading
with forwardIndexOnly=true
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Apply forwardIndexOnly to all converters (AVRO/CSV/JSON/PARQUET)
For data dump use cases, only the forward index is needed regardless
of output format. Add forwardIndexOnly support to all converters:
- PinotSegmentToAvroConverter: also reuse SegmentProcessorAvroUtils
- PinotSegmentToCsvConverter
- PinotSegmentToJsonConverter
- Update CLI description to reflect it applies to all formats
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* Fix csvListDelimiter bug and clarify forwardIndexOnly Javadocs
- Fix pre-existing bug where _csvDelimiter was passed as both row
delimiter and list delimiter, making -csvListDelimiter ignored
- Clarify Javadocs on ImmutableSegmentLoader and PinotSegmentRecordReader
to note that forwardIndexOnly skips column-level secondary indexes
but segment-level indexes (star-tree, multi-column text) are still
loaded when present
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../immutable/ImmutableSegmentLoader.java | 12 +++
.../index/column/PhysicalColumnIndexContainer.java | 9 +++
.../segment/index/loader/IndexLoadingConfig.java | 9 +++
.../segment/readers/PinotSegmentRecordReader.java | 17 ++++-
.../readers/PinotSegmentRecordReaderTest.java | 26 +++++++
.../converter/PinotSegmentConvertCommand.java | 27 +++++--
.../converter/PinotSegmentToAvroConverter.java | 29 +++-----
.../converter/PinotSegmentToCsvConverter.java | 12 ++-
.../converter/PinotSegmentToJsonConverter.java | 11 ++-
.../converter/PinotSegmentToParquetConverter.java | 85 ++++++++++++++++++++++
.../converter/PinotSegmentConverterTest.java | 29 ++++++++
11 files changed, 238 insertions(+), 28 deletions(-)
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index abd37a0e9bc..20c16ad962d 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -68,8 +68,20 @@ public class ImmutableSegmentLoader {
*/
public static ImmutableSegment load(File indexDir, ReadMode readMode)
throws Exception {
+ return load(indexDir, readMode, false);
+ }
+
+ /**
+ * Loads the segment in read-only mode with an option to load only
column-level forward index, dictionary,
+ * and null value vector, skipping other column-level secondary indexes.
Segment-level indexes (such as
+ * star-tree or multi-column text index) are still loaded when present. This
is useful for tools like segment
+ * converters that only need to read data without requiring column-level
secondary indexes.
+ */
+ public static ImmutableSegment load(File indexDir, ReadMode readMode,
boolean forwardIndexOnly)
+ throws Exception {
IndexLoadingConfig defaultIndexLoadingConfig = new IndexLoadingConfig();
defaultIndexLoadingConfig.setReadMode(readMode);
+ defaultIndexLoadingConfig.setForwardIndexOnly(forwardIndexOnly);
return load(indexDir, defaultIndexLoadingConfig, false, null, null);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index edc996c7a02..e1cd12af0ed 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -23,6 +23,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import
org.apache.pinot.segment.local.segment.index.readers.text.MultiColumnLuceneTextIndexReader;
@@ -33,6 +34,7 @@ import
org.apache.pinot.segment.spi.index.IndexReaderConstraintException;
import org.apache.pinot.segment.spi.index.IndexReaderFactory;
import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.IndexType;
+import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.slf4j.Logger;
@@ -42,6 +44,9 @@ import org.slf4j.LoggerFactory;
public final class PhysicalColumnIndexContainer implements
ColumnIndexContainer {
private static final Logger LOGGER =
LoggerFactory.getLogger(PhysicalColumnIndexContainer.class);
+ private static final Set<String> FORWARD_INDEX_ONLY_TYPES =
+ Set.of(StandardIndexes.FORWARD_ID, StandardIndexes.DICTIONARY_ID,
StandardIndexes.NULL_VALUE_VECTOR_ID);
+
private final IndexTypeMap _indexTypeMap;
// Reference to shared segment-level multi-column text index reader.
@@ -61,8 +66,12 @@ public final class PhysicalColumnIndexContainer implements
ColumnIndexContainer
ArrayList<IndexType> indexTypes = new ArrayList<>();
ArrayList<IndexReader> readers = new ArrayList<>();
+ boolean forwardIndexOnly = indexLoadingConfig.isForwardIndexOnly();
try {
for (IndexType<?, ?, ?> indexType :
IndexService.getInstance().getAllIndexes()) {
+ if (forwardIndexOnly &&
!FORWARD_INDEX_ONLY_TYPES.contains(indexType.getId())) {
+ continue;
+ }
if (segmentReader.hasIndexFor(columnName, indexType)) {
IndexReaderFactory<?> readerProvider = indexType.getReaderFactory();
try {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 51f17d66914..a2565f09ddc 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -66,6 +66,7 @@ public class IndexLoadingConfig {
private Set<String> _knownColumns;
private String _tableDataDir;
private boolean _errorOnColumnBuildFailure;
+ private boolean _forwardIndexOnly;
// Initialized by instance data manager config
private String _instanceId;
@@ -349,6 +350,14 @@ public class IndexLoadingConfig {
_errorOnColumnBuildFailure = errorOnColumnBuildFailure;
}
+ public boolean isForwardIndexOnly() {
+ return _forwardIndexOnly;
+ }
+
+ public void setForwardIndexOnly(boolean forwardIndexOnly) {
+ _forwardIndexOnly = forwardIndexOnly;
+ }
+
public boolean isSkipSegmentPreprocess() {
return _tableConfig != null &&
_tableConfig.getIndexingConfig().isSkipSegmentPreprocess();
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
index f1e5834205f..d284ce76d71 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReader.java
@@ -117,9 +117,24 @@ public class PinotSegmentRecordReader implements
RecordReader {
*/
public void init(File indexDir, @Nullable Set<String> fieldsToRead,
@Nullable List<String> sortOrder,
boolean skipDefaultNullValues) {
+ init(indexDir, fieldsToRead, sortOrder, skipDefaultNullValues, false);
+ }
+
+ /**
+ * Initializes the record reader from an index directory with an option to
skip column-level secondary indexes.
+ *
+ * @param indexDir Index directory
+ * @param fieldsToRead The fields to read from the segment. If null or
empty, reads all fields
+ * @param sortOrder List of sorted columns
+ * @param skipDefaultNullValues Whether to skip putting default null values
into the record
+ * @param forwardIndexOnly Whether to load only column-level forward index,
dictionary, and null value vector,
+ * skipping other column-level secondary indexes
+ */
+ public void init(File indexDir, @Nullable Set<String> fieldsToRead,
@Nullable List<String> sortOrder,
+ boolean skipDefaultNullValues, boolean forwardIndexOnly) {
IndexSegment indexSegment;
try {
- indexSegment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap);
+ indexSegment = ImmutableSegmentLoader.load(indexDir, ReadMode.mmap,
forwardIndexOnly);
} catch (Exception e) {
throw new RuntimeException("Caught exception while loading the segment
from: " + indexDir, e);
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
index b888cede9b2..4238eed0e11 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentRecordReaderTest.java
@@ -128,6 +128,32 @@ public class PinotSegmentRecordReaderTest {
}
}
+ @Test
+ public void testPinotSegmentRecordReaderForwardIndexOnly()
+ throws Exception {
+ List<GenericRow> outputRows = new ArrayList<>();
+
+ PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader();
+ pinotSegmentRecordReader.init(_segmentIndexDir, null, null, false, true);
+ try (pinotSegmentRecordReader) {
+ while (pinotSegmentRecordReader.hasNext()) {
+ outputRows.add(pinotSegmentRecordReader.next());
+ }
+ }
+
+ Assert.assertEquals(outputRows.size(), _rows.size(),
+ "Number of rows returned by PinotSegmentRecordReader with
forwardIndexOnly is incorrect");
+ for (int i = 0; i < outputRows.size(); i++) {
+ GenericRow outputRow = outputRows.get(i);
+ GenericRow row = _rows.get(i);
+ Assert.assertEquals(outputRow.getValue(D_SV_1), row.getValue(D_SV_1));
+
Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1),
row.getValue(D_MV_1)));
+ Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
+ Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
+ Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+ }
+ }
+
@AfterClass
public void cleanup() {
FileUtils.deleteQuietly(new File(_segmentOutputDir));
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
index 2176e61a2bd..e4f8dfb99ae 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentConvertCommand.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.tools.AbstractBaseCommand;
@@ -38,6 +39,7 @@ import picocli.CommandLine;
* <li>AVRO format</li>
* <li>CSV format</li>
* <li>JSON format</li>
+ * <li>PARQUET format</li>
* </ul>
*/
@SuppressWarnings("FieldCanBeLocal")
@@ -54,7 +56,7 @@ public class PinotSegmentConvertCommand extends
AbstractBaseCommand implements C
private String _outputDir;
@CommandLine.Option(names = {"-outputFormat"}, required = true,
- description = "Format to convert to (AVRO/CSV/JSON).")
+ description = "Format to convert to (AVRO/CSV/JSON/PARQUET).")
private String _outputFormat;
@CommandLine.Option(names = {"-csvDelimiter"}, required = false, description
= "CSV delimiter (default ',').")
@@ -67,6 +69,14 @@ public class PinotSegmentConvertCommand extends
AbstractBaseCommand implements C
@CommandLine.Option(names = {"-csvWithHeader"}, required = false,
description = "Print CSV Header (default false).")
private boolean _csvWithHeader;
+ @CommandLine.Option(names = {"-parquetCompression"}, required = false,
+ description = "Parquet compression codec
(GZIP/SNAPPY/ZSTD/LZ4/UNCOMPRESSED, default GZIP).")
+ private CompressionCodecName _parquetCompression = CompressionCodecName.GZIP;
+
+ @CommandLine.Option(names = {"-forwardIndexOnly"}, required = false,
+ description = "Load only forward index from the segment, skipping
secondary indexes (default false).")
+ private boolean _forwardIndexOnly;
+
@CommandLine.Option(names = {"-overwrite"}, required = false,
description = "Overwrite the existing file (default false).")
private boolean _overwrite;
@@ -127,16 +137,21 @@ public class PinotSegmentConvertCommand extends
AbstractBaseCommand implements C
switch (FileFormat.fromString(_outputFormat)) {
case AVRO:
outputPath += ".avro";
- new PinotSegmentToAvroConverter(inputPath, outputPath).convert();
+ new PinotSegmentToAvroConverter(inputPath, outputPath,
_forwardIndexOnly).convert();
break;
case CSV:
outputPath += ".csv";
- new PinotSegmentToCsvConverter(inputPath, outputPath,
_csvDelimiter, _csvDelimiter, _csvWithHeader)
- .convert();
+ new PinotSegmentToCsvConverter(inputPath, outputPath,
_csvDelimiter, _csvListDelimiter, _csvWithHeader,
+ _forwardIndexOnly).convert();
break;
case JSON:
outputPath += ".json";
- new PinotSegmentToJsonConverter(inputPath, outputPath).convert();
+ new PinotSegmentToJsonConverter(inputPath, outputPath,
_forwardIndexOnly).convert();
+ break;
+ case PARQUET:
+ outputPath += ".parquet";
+ new PinotSegmentToParquetConverter(inputPath, outputPath,
_parquetCompression, _forwardIndexOnly)
+ .convert();
break;
default:
throw new RuntimeException("Unsupported conversion to file format:
" + _outputFormat);
@@ -152,6 +167,6 @@ public class PinotSegmentConvertCommand extends
AbstractBaseCommand implements C
@Override
public String description() {
- return "Convert Pinot segments to another format such as AVRO/CSV/JSON.";
+ return "Convert Pinot segments to another format such as
AVRO/CSV/JSON/PARQUET.";
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
index f988f050ab9..dd0d56d2cf4 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToAvroConverter.java
@@ -19,13 +19,11 @@
package org.apache.pinot.tools.segment.converter;
import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -38,10 +36,16 @@ import org.apache.pinot.spi.data.readers.GenericRow;
public class PinotSegmentToAvroConverter implements PinotSegmentConverter {
private final String _segmentDir;
private final String _outputFile;
+ private final boolean _forwardIndexOnly;
public PinotSegmentToAvroConverter(String segmentDir, String outputFile) {
+ this(segmentDir, outputFile, false);
+ }
+
+ public PinotSegmentToAvroConverter(String segmentDir, String outputFile,
boolean forwardIndexOnly) {
_segmentDir = segmentDir;
_outputFile = outputFile;
+ _forwardIndexOnly = forwardIndexOnly;
}
@Override
@@ -49,26 +53,17 @@ public class PinotSegmentToAvroConverter implements
PinotSegmentConverter {
throws Exception {
File indexDir = new File(_segmentDir);
Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(new
SegmentMetadataImpl(indexDir).getSchema());
- try (PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader(new File(_segmentDir))) {
+ PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader();
+ pinotSegmentRecordReader.init(indexDir, null, null, false,
_forwardIndexOnly);
+ try (pinotSegmentRecordReader) {
try (DataFileWriter<Record> recordWriter = new DataFileWriter<>(new
GenericDatumWriter<>(avroSchema))) {
recordWriter.create(avroSchema, new File(_outputFile));
GenericRow row = new GenericRow();
+ Record reusableRecord = new Record(avroSchema);
while (pinotSegmentRecordReader.hasNext()) {
row = pinotSegmentRecordReader.next(row);
- Record record = new Record(avroSchema);
- for (Map.Entry<String, Object> entry :
row.getFieldToValueMap().entrySet()) {
- String field = entry.getKey();
- Object value = entry.getValue();
- if (value instanceof Object[]) {
- record.put(field, Arrays.asList((Object[]) value));
- } else if (value instanceof byte[]) {
- record.put(field, ByteBuffer.wrap((byte[]) value));
- } else {
- record.put(field, value);
- }
- }
-
+ Record record =
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(row, reusableRecord);
recordWriter.append(record);
row.clear();
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
index 439cfb991ed..022a0d6fc9f 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToCsvConverter.java
@@ -37,21 +37,29 @@ public class PinotSegmentToCsvConverter implements
PinotSegmentConverter {
private final char _delimiter;
private final char _listDelimiter;
private final boolean _withHeader;
+ private final boolean _forwardIndexOnly;
PinotSegmentToCsvConverter(String segmentDir, String outputFile, char
delimiter, char listDelimiter,
boolean withHeader) {
+ this(segmentDir, outputFile, delimiter, listDelimiter, withHeader, false);
+ }
+
+ PinotSegmentToCsvConverter(String segmentDir, String outputFile, char
delimiter, char listDelimiter,
+ boolean withHeader, boolean forwardIndexOnly) {
_segmentDir = segmentDir;
_outputFile = outputFile;
_delimiter = delimiter;
_listDelimiter = listDelimiter;
_withHeader = withHeader;
+ _forwardIndexOnly = forwardIndexOnly;
}
@Override
public void convert()
throws Exception {
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader(new File(_segmentDir));
- BufferedWriter recordWriter = new BufferedWriter(new
FileWriter(_outputFile))) {
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ recordReader.init(new File(_segmentDir), null, null, false,
_forwardIndexOnly);
+ try (recordReader; BufferedWriter recordWriter = new BufferedWriter(new
FileWriter(_outputFile))) {
GenericRow row = new GenericRow();
row = recordReader.next(row);
String[] fields = row.getFieldToValueMap().keySet().toArray(new
String[0]);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
index e0abf097496..f6cf356adc9 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToJsonConverter.java
@@ -35,17 +35,24 @@ import org.apache.pinot.spi.utils.JsonUtils;
public class PinotSegmentToJsonConverter implements PinotSegmentConverter {
private final String _segmentDir;
private final String _outputFile;
+ private final boolean _forwardIndexOnly;
public PinotSegmentToJsonConverter(String segmentDir, String outputFile) {
+ this(segmentDir, outputFile, false);
+ }
+
+ public PinotSegmentToJsonConverter(String segmentDir, String outputFile,
boolean forwardIndexOnly) {
_segmentDir = segmentDir;
_outputFile = outputFile;
+ _forwardIndexOnly = forwardIndexOnly;
}
@Override
public void convert()
throws Exception {
- try (PinotSegmentRecordReader recordReader = new
PinotSegmentRecordReader(new File(_segmentDir));
- BufferedWriter recordWriter = new BufferedWriter(new
FileWriter(_outputFile))) {
+ PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+ recordReader.init(new File(_segmentDir), null, null, false,
_forwardIndexOnly);
+ try (recordReader; BufferedWriter recordWriter = new BufferedWriter(new
FileWriter(_outputFile))) {
GenericRow row = new GenericRow();
while (recordReader.hasNext()) {
row = recordReader.next(row);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
new file mode 100644
index 00000000000..8a901d56779
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/PinotSegmentToParquetConverter.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.segment.converter;
+
+import java.io.File;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.plugin.inputformat.parquet.ParquetUtils;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * The <code>PinotSegmentToParquetConverter</code> class is the tool to
convert Pinot segment to Parquet format.
+ */
+public class PinotSegmentToParquetConverter implements PinotSegmentConverter {
+ private final String _segmentDir;
+ private final String _outputFile;
+ private final CompressionCodecName _compressionCodec;
+ private final boolean _forwardIndexOnly;
+
+ public PinotSegmentToParquetConverter(String segmentDir, String outputFile) {
+ this(segmentDir, outputFile, CompressionCodecName.GZIP, false);
+ }
+
+ public PinotSegmentToParquetConverter(String segmentDir, String outputFile,
+ CompressionCodecName compressionCodec, boolean forwardIndexOnly) {
+ _segmentDir = segmentDir;
+ _outputFile = outputFile;
+ _compressionCodec = compressionCodec;
+ _forwardIndexOnly = forwardIndexOnly;
+ }
+
+ @Override
+ public void convert()
+ throws Exception {
+ File indexDir = new File(_segmentDir);
+ Schema avroSchema = AvroUtils.getAvroSchemaFromPinotSchema(new
SegmentMetadataImpl(indexDir).getSchema());
+ Configuration hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
+ OutputFile outputFile = HadoopOutputFile.fromPath(new Path(_outputFile),
hadoopConf);
+ PinotSegmentRecordReader pinotSegmentRecordReader = new
PinotSegmentRecordReader();
+ pinotSegmentRecordReader.init(new File(_segmentDir), null, null, false,
_forwardIndexOnly);
+ try (pinotSegmentRecordReader) {
+ try (ParquetWriter<Record> parquetWriter =
+ AvroParquetWriter.<Record>builder(outputFile).withSchema(avroSchema)
+ .withCompressionCodec(_compressionCodec)
+ .withConf(hadoopConf).build()) {
+ GenericRow row = new GenericRow();
+ Record reusableRecord = new Record(avroSchema);
+ while (pinotSegmentRecordReader.hasNext()) {
+ row = pinotSegmentRecordReader.next(row);
+ Record record =
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(row, reusableRecord);
+ parquetWriter.write(record);
+ row.clear();
+ }
+ }
+ }
+ }
+}
diff --git
a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
index f4a76a70188..1cae431f614 100644
---
a/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
+++
b/pinot-tools/src/test/java/org/apache/pinot/tools/segment/converter/PinotSegmentConverterTest.java
@@ -26,6 +26,7 @@ import
org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReader;
import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
import org.apache.pinot.plugin.inputformat.json.JSONRecordReader;
+import org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -187,6 +188,34 @@ public class PinotSegmentConverterTest {
}
}
+ @Test
+ public void testParquetConverter()
+ throws Exception {
+ File outputFile = new File(TEMP_DIR, "segment.parquet");
+ PinotSegmentToParquetConverter parquetConverter =
+ new PinotSegmentToParquetConverter(_segmentDir, outputFile.getPath());
+ parquetConverter.convert();
+
+ try (ParquetRecordReader recordReader = new ParquetRecordReader()) {
+ recordReader.init(outputFile, SCHEMA.getFieldSpecMap().keySet(), null);
+
+ GenericRow record = recordReader.next();
+ assertEquals(record.getValue(INT_SV_COLUMN), 1);
+ assertEquals(record.getValue(LONG_SV_COLUMN), 2L);
+ assertEquals(record.getValue(FLOAT_SV_COLUMN), 3.0f);
+ assertEquals(record.getValue(DOUBLE_SV_COLUMN), 4.0);
+ assertEquals(record.getValue(STRING_SV_COLUMN), "5");
+ assertEquals(record.getValue(BYTES_SV_COLUMN), new byte[]{6, 12, 34,
56});
+ assertEquals(record.getValue(INT_MV_COLUMN), new Object[]{7, 8});
+ assertEquals(record.getValue(LONG_MV_COLUMN), new Object[]{9L, 10L});
+ assertEquals(record.getValue(FLOAT_MV_COLUMN), new Object[]{11.0f,
12.0f});
+ assertEquals(record.getValue(DOUBLE_MV_COLUMN), new Object[]{13.0,
14.0});
+ assertEquals(record.getValue(STRING_MV_COLUMN), new Object[]{"15",
"16"});
+
+ assertFalse(recordReader.hasNext());
+ }
+ }
+
@AfterClass
public void tearDown()
throws IOException {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]