This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2ba3839068 [format] Bump parquet from 1.13.1 to 1.15.1 (#5421)
2ba3839068 is described below
commit 2ba38390687dbcc489d4557e484b41301880269c
Author: yangjf2019 <[email protected]>
AuthorDate: Mon Apr 21 14:16:04 2025 +0800
[format] Bump parquet from 1.13.1 to 1.15.1 (#5421)
---
paimon-arrow/pom.xml | 37 ++++
paimon-core/pom.xml | 6 +
.../utils/PartitionStatisticsReporterTest.java | 11 +-
.../postgres/PostgresSyncTableActionITCase.java | 2 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 2 +-
.../flink/action/CompactDatabaseActionITCase.java | 6 +-
...nlySingleTableCompactionWorkerOperatorTest.java | 23 +--
.../paimon/flink/sink/CommitterOperatorTest.java | 17 +-
paimon-format/pom.xml | 8 +
.../apache/parquet/hadoop/ParquetFileReader.java | 222 ++++++++++++++-------
paimon-format/src/main/resources/META-INF/NOTICE | 14 +-
pom.xml | 6 +-
12 files changed, 246 insertions(+), 108 deletions(-)
diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml
index 48a30f0b92..4823f8d47b 100644
--- a/paimon-arrow/pom.xml
+++ b/paimon-arrow/pom.xml
@@ -120,5 +120,42 @@ under the License.
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 9209b35122..e06183c7da 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -210,6 +210,12 @@ under the License.
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>parquet-hadoop</artifactId>
+ <groupId>org.apache.parquet</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index ecc20dc686..ea1b504df4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -37,7 +37,6 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -46,6 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Test for {@link PartitionStatisticsReporter}. */
public class PartitionStatisticsReporterTest {
@@ -125,11 +126,11 @@ public class PartitionStatisticsReporterTest {
new PartitionStatisticsReporter(table, partitionHandler);
long time = 1729598544974L;
action.report("c1=a/", time);
- Assertions.assertThat(partitionParams).containsKey("c1=a/");
- Assertions.assertThat(partitionParams.get("c1=a/").toString())
+ assertThat(partitionParams).containsKey("c1=a/");
+ assertThat(partitionParams.get("c1=a/").toString())
.isEqualTo(
- "{spec={c1=a}, recordCount=1, fileSizeInBytes=591,
fileCount=1, lastFileCreationTime=1729598544974}");
+ "{spec={c1=a}, recordCount=1, fileSizeInBytes=662,
fileCount=1, lastFileCreationTime=1729598544974}");
action.close();
- Assertions.assertThat(closed).isTrue();
+ assertThat(closed).isTrue();
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index 55403a90a9..8e0efd110b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -532,7 +532,7 @@ public class PostgresSyncTableActionITCase extends
PostgresActionITCaseBase {
}
@Test
- @Timeout(60)
+ @Timeout(180)
public void testComputedColumn() throws Exception {
// the first round checks for table creation
// the second round checks for running the action on an existing table
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index d1ed5dbc84..117afd1213 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -66,7 +66,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/** Tests for changelog table with primary keys. */
public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
- private static final int TIMEOUT = 180;
+ private static final int TIMEOUT = 480;
private static final Logger LOG =
LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class);
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 8d7be925ef..dd5b4c03ea 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -1052,7 +1052,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
waitUtil(
() -> snapshotManager.latestSnapshotId() == 11L,
- Duration.ofSeconds(60),
+ Duration.ofSeconds(240),
Duration.ofMillis(500));
jobClient.cancel();
@@ -1061,8 +1061,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
waitUtil(
() -> snapshotManager.earliestSnapshotId() == 9L,
- Duration.ofSeconds(60),
- Duration.ofMillis(200),
+ Duration.ofSeconds(240),
+ Duration.ofMillis(500),
"Failed to wait snapshot expiration success");
List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
index 2f907320a8..c30bfdc03a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
@@ -39,8 +39,6 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -49,10 +47,13 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
/** Tests for {@link AppendOnlySingleTableCompactionWorkerOperator}. */
public class AppendOnlySingleTableCompactionWorkerOperatorTest extends
TableTestBase {
- @RepeatedTest(10)
+ @Test
public void testAsyncCompactionWorks() throws Exception {
createTableDefault();
AppendOnlySingleTableCompactionWorkerOperator workerOperator =
@@ -73,7 +74,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<UnawareAppendCompactionTask> tasks = packTask(commitMessages, 5);
List<StreamRecord<UnawareAppendCompactionTask>> records =
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
- Assertions.assertThat(tasks.size()).isEqualTo(4);
+ assertThat(tasks.size()).isEqualTo(4);
workerOperator.open();
@@ -85,7 +86,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
Long timeStart = System.currentTimeMillis();
long timeout = 60_000L;
- Assertions.assertThatCode(
+ assertThatCode(
() -> {
while (committables.size() != 4) {
committables.addAll(
@@ -105,7 +106,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
.doesNotThrowAnyException();
committables.forEach(
a ->
- Assertions.assertThat(
+ assertThat(
((CommitMessageImpl)
a.wrappedCommittable())
.compactIncrement()
.compactAfter()
@@ -140,7 +141,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<UnawareAppendCompactionTask> tasks = packTask(commitMessages, 5);
List<StreamRecord<UnawareAppendCompactionTask>> records =
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
- Assertions.assertThat(tasks.size()).isEqualTo(8);
+ assertThat(tasks.size()).isEqualTo(8);
workerOperator.open();
@@ -149,7 +150,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
}
// wait compaction
- Thread.sleep(500);
+ Thread.sleep(5000);
LocalFileIO localFileIO = LocalFileIO.create();
DataFilePathFactory dataFilePathFactory =
@@ -166,8 +167,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<DataFileMeta> fileMetas =
((CommitMessageImpl)
commitMessage).compactIncrement().compactAfter();
for (DataFileMeta fileMeta : fileMetas) {
-
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
- .isTrue();
+
assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isTrue();
}
if (i++ > 2) {
break;
@@ -193,8 +193,7 @@ public class
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
List<DataFileMeta> fileMetas =
((CommitMessageImpl)
commitMessage).compactIncrement().compactAfter();
for (DataFileMeta fileMeta : fileMetas) {
-
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
- .isFalse();
+
assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isFalse();
}
} catch (Exception e) {
// do nothing
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 41ccfbf79e..b5ce23ef50 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -58,7 +58,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -69,6 +68,7 @@ import java.util.UUID;
import static org.apache.paimon.SnapshotTest.newSnapshotManager;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.fail;
/** Tests for {@link CommitterOperator}. */
@@ -282,9 +282,8 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
testHarness1.initializeState(snapshot);
testHarness1.close();
- Assertions.assertThat(actual.size()).isEqualTo(1);
-
-
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
+ assertThat(actual.size()).isEqualTo(1);
+ assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
}
@Test
@@ -325,7 +324,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
createTestHarness(operatorFactory);
testHarness.open();
- Assertions.assertThatCode(
+ assertThatCode(
() -> {
long time = System.currentTimeMillis();
long cp = 0L;
@@ -387,7 +386,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
.doesNotThrowAnyException();
if (operatorFactory instanceof CommitterOperator) {
- Assertions.assertThat(
+ assertThat(
((ManifestCommittable)
((CommitterOperator)
operatorFactory)
.committablesPerCheckpoint.get(Long.MAX_VALUE))
@@ -396,7 +395,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
.isEqualTo(3);
}
- Assertions.assertThatCode(
+ assertThatCode(
() -> {
long time = System.currentTimeMillis();
long cp = 0L;
@@ -608,7 +607,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
Committer.createContext("", metricGroup, true, false,
null, 1, 1));
committer.commit(Collections.singletonList(manifestCommittable));
CommitterMetrics metrics = committer.getCommitterMetrics();
- assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);
+ assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(572);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
committer.close();
}
@@ -705,7 +704,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
table, commitUser, new NoopCommittableStateManager());
try (OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
createTestHarness(operatorFactory, 10, 10, 3)) {
- Assertions.assertThatCode(testHarness::open)
+ assertThatCode(testHarness::open)
.hasMessage("Committer Operator parallelism in paimon MUST
be one.");
}
}
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index e141eac2aa..15a894938d 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -36,6 +36,7 @@ under the License.
<commons.pool.version>1.6</commons.pool.version>
<commons.lang3.version>3.12.0</commons.lang3.version>
<storage-api.version>2.8.1</storage-api.version>
+ <commons.io.version>2.16.1</commons.io.version>
</properties>
<dependencies>
@@ -168,6 +169,12 @@ under the License.
<version>${joda-time.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons.io.version}</version>
+ </dependency>
+
<!-- Avro End -->
<!-- Parquet Start -->
@@ -333,6 +340,7 @@ under the License.
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-annotations</include>
<include>org.apache.commons:commons-compress</include>
+ <include>commons-io:commons-io</include>
<!-- Parquet -->
<include>org.apache.parquet:parquet-hadoop</include>
diff --git
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e480e11222..bebdba7670 100644
---
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -20,14 +20,17 @@ package org.apache.parquet.hadoop;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.format.parquet.ParquetInputStream;
-import org.apache.paimon.fs.FileRange;
import org.apache.paimon.fs.VectoredReadable;
import org.apache.paimon.utils.RoaringBitmap32;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.ReusingByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
@@ -63,6 +66,7 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.wrapped.io.FutureIO;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
@@ -71,10 +75,10 @@ import
org.apache.parquet.internal.filter2.columnindex.RowRanges;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.ParquetFileRange;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
-import org.apache.yetus.audience.InterfaceAudience.Private;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,11 +98,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
import static
org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER;
import static
org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
@@ -118,11 +122,20 @@ public class ParquetFileReader implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetFileReader.class);
+ public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300;
+
private final ParquetMetadataConverter converter;
private final CRC32 crc;
+ private final ReusingByteBufferAllocator crcAllocator;
+
+ public static final ParquetMetadata readFooter(
+ InputFile file, ParquetReadOptions options, SeekableInputStream f)
throws IOException {
+ ParquetMetadataConverter converter = new
ParquetMetadataConverter(options);
+ return readFooter(file, options, f, converter);
+ }
- private static ParquetMetadata readFooter(
+ private static final ParquetMetadata readFooter(
InputFile file,
ParquetReadOptions options,
SeekableInputStream f,
@@ -180,35 +193,39 @@ public class ParquetFileReader implements Closeable {
// Read all the footer bytes in one time to avoid multiple read
operations,
// since it can be pretty time consuming for a single read operation
in HDFS.
- ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
- f.readFully(footerBytesBuffer);
- LOG.debug("Finished to read all footer bytes.");
- footerBytesBuffer.flip();
- InputStream footerBytesStream =
ByteBufferInputStream.wrap(footerBytesBuffer);
-
- // Regular file, or encrypted file with plaintext footer
- if (!encryptedFooterMode) {
+ ByteBuffer footerBytesBuffer =
options.getAllocator().allocate(fileMetadataLength);
+ try {
+ f.readFully(footerBytesBuffer);
+ LOG.debug("Finished to read all footer bytes.");
+ footerBytesBuffer.flip();
+ InputStream footerBytesStream =
ByteBufferInputStream.wrap(footerBytesBuffer);
+
+ // Regular file, or encrypted file with plaintext footer
+ if (!encryptedFooterMode) {
+ return converter.readParquetMetadata(
+ footerBytesStream,
+ options.getMetadataFilter(),
+ fileDecryptor,
+ false,
+ fileMetadataLength);
+ }
+
+ // Encrypted file with encrypted footer
+ if (null == fileDecryptor) {
+ throw new ParquetCryptoRuntimeException(
+ "Trying to read file with encrypted footer. No keys
available");
+ }
+ FileCryptoMetaData fileCryptoMetaData =
readFileCryptoMetaData(footerBytesStream);
+ fileDecryptor.setFileCryptoMetaData(
+ fileCryptoMetaData.getEncryption_algorithm(),
+ true,
+ fileCryptoMetaData.getKey_metadata());
+ // footer length is required only for signed plaintext footers
return converter.readParquetMetadata(
- footerBytesStream,
- options.getMetadataFilter(),
- fileDecryptor,
- false,
- fileMetadataLength);
- }
-
- // Encrypted file with encrypted footer
- if (null == fileDecryptor) {
- throw new ParquetCryptoRuntimeException(
- "Trying to read file with encrypted footer. No keys
available");
- }
- FileCryptoMetaData fileCryptoMetaData =
readFileCryptoMetaData(footerBytesStream);
- fileDecryptor.setFileCryptoMetaData(
- fileCryptoMetaData.getEncryption_algorithm(),
- true,
- fileCryptoMetaData.getKey_metadata());
- // footer length is required only for signed plaintext footers
- return converter.readParquetMetadata(
- footerBytesStream, options.getMetadataFilter(), fileDecryptor,
true, 0);
+ footerBytesStream, options.getMetadataFilter(),
fileDecryptor, true, 0);
+ } finally {
+ options.getAllocator().release(footerBytesBuffer);
+ }
}
protected final ParquetInputStream f;
@@ -268,7 +285,13 @@ public class ParquetFileReader implements Closeable {
for (ColumnDescriptor col :
footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
- this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+ if (options.usePageChecksumVerification()) {
+ this.crc = new CRC32();
+ this.crcAllocator =
ReusingByteBufferAllocator.strict(options.getAllocator());
+ } else {
+ this.crc = null;
+ this.crcAllocator = null;
+ }
}
private static <T> List<T> listWithNulls(int size) {
@@ -446,7 +469,7 @@ public class ParquetFileReader implements Closeable {
ColumnChunkPageReadStore rowGroup =
new ColumnChunkPageReadStore(block.getRowCount(),
block.getRowIndexOffset());
// prepare the list of consecutive parts to read them in one scan
- List<ConsecutivePartList> allParts = new
ArrayList<ConsecutivePartList>();
+ List<ConsecutivePartList> allParts = new ArrayList<>();
ConsecutivePartList currentParts = null;
for (ColumnChunkMetaData mc : block.getColumns()) {
ColumnPath pathKey = mc.getPath();
@@ -466,6 +489,7 @@ public class ParquetFileReader implements Closeable {
// actually read all the chunks
ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
readAllPartsVectoredOrNormal(allParts, builder);
+ rowGroup.setReleaser(builder.releaser);
for (Chunk chunk : builder.build()) {
readChunkPages(chunk, block, rowGroup);
}
@@ -600,24 +624,25 @@ public class ParquetFileReader implements Closeable {
@SuppressWarnings("checkstyle:JavadocParagraph")
private void readVectored(List<ConsecutivePartList> allParts,
ChunkListBuilder builder)
throws IOException {
- List<FileRange> ranges = new ArrayList<>(allParts.size());
+
+ List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
long totalSize = 0;
for (ConsecutivePartList consecutiveChunks : allParts) {
final long len = consecutiveChunks.length;
- checkArgument(
+ Preconditions.checkArgument(
len < Integer.MAX_VALUE,
"Invalid length %s for vectored read operation. It must be
less than max integer value.",
len);
- ranges.add(FileRange.createFileRange(consecutiveChunks.offset,
(int) len));
+ ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int)
len));
totalSize += len;
}
LOG.debug(
"Reading {} bytes of data with vectored IO in {} ranges",
totalSize, ranges.size());
// Request a vectored read;
- ((VectoredReadable) f.in()).readVectored(ranges);
+ f.readVectored(ranges, options.getAllocator());
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
- FileRange currRange = ranges.get(k++);
+ ParquetFileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
}
}
@@ -707,6 +732,7 @@ public class ParquetFileReader implements Closeable {
}
// actually read all the chunks
readAllPartsVectoredOrNormal(allParts, builder);
+ rowGroup.setReleaser(builder.releaser);
for (Chunk chunk : builder.build()) {
readChunkPages(chunk, block, rowGroup);
}
@@ -798,11 +824,11 @@ public class ParquetFileReader implements Closeable {
if (blockIndex < 0 || blockIndex >= blocks.size()) {
return null;
}
- return new DictionaryPageReader(this, blocks.get(blockIndex));
+ return new DictionaryPageReader(this, blocks.get(blockIndex),
options.getAllocator());
}
public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
- return new DictionaryPageReader(this, block);
+ return new DictionaryPageReader(this, block, options.getAllocator());
}
/**
@@ -888,10 +914,7 @@ public class ParquetFileReader implements Closeable {
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
- byte[] dictPageBytes = new byte[compressedPageSize];
- fin.readFully(dictPageBytes);
-
- BytesInput bin = BytesInput.from(dictPageBytes);
+ BytesInput bin = BytesInput.from(fin, compressedPageSize);
if (null != pageDecryptor) {
bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(),
dictionaryPageAAD));
@@ -1090,6 +1113,7 @@ public class ParquetFileReader implements Closeable {
private ChunkDescriptor lastDescriptor;
private final long rowCount;
private SeekableInputStream f;
+ private final ByteBufferReleaser releaser = new
ByteBufferReleaser(options.getAllocator());
public ChunkListBuilder(long rowCount) {
this.rowCount = rowCount;
@@ -1101,6 +1125,10 @@ public class ParquetFileReader implements Closeable {
this.f = f;
}
+ void addBuffersToRelease(List<ByteBuffer> toRelease) {
+ toRelease.forEach(releaser::releaseLater);
+ }
+
void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex
offsetIndex) {
map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex
= offsetIndex;
}
@@ -1161,16 +1189,18 @@ public class ParquetFileReader implements Closeable {
* Calculate checksum of input bytes, throw decoding exception if it
does not match the
* provided reference crc.
*/
- private void verifyCrc(int referenceCrc, byte[] bytes, String
exceptionMsg) {
+ private void verifyCrc(int referenceCrc, BytesInput bytes, String
exceptionMsg) {
crc.reset();
- crc.update(bytes);
+ try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) {
+ crc.update(bytes.toByteBuffer(releaser));
+ }
if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) {
throw new ParquetDecodingException(exceptionMsg);
}
}
/**
- * Read all of the pages in a given column chunk.
+ * Read all the pages in a given column chunk.
*
* @return the list of pages
*/
@@ -1237,7 +1267,7 @@ public class ParquetFileReader implements Closeable {
if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
- pageBytes.toByteArray(),
+ pageBytes,
"could not verify dictionary page
integrity, CRC checksum verification failed");
}
DictionaryPageHeader dicHeader =
pageHeader.getDictionary_page_header();
@@ -1258,7 +1288,7 @@ public class ParquetFileReader implements Closeable {
if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
verifyCrc(
pageHeader.getCrc(),
- pageBytes.toByteArray(),
+ pageBytes,
"could not verify page integrity, CRC
checksum verification failed");
}
DataPageV1 dataPageV1 =
@@ -1289,23 +1319,41 @@ public class ParquetFileReader implements Closeable {
compressedPageSize
-
dataHeaderV2.getRepetition_levels_byte_length()
-
dataHeaderV2.getDefinition_levels_byte_length();
- pagesInChunk.add(
+ final BytesInput repetitionLevels =
+ this.readAsBytesInput(
+
dataHeaderV2.getRepetition_levels_byte_length());
+ final BytesInput definitionLevels =
+ this.readAsBytesInput(
+
dataHeaderV2.getDefinition_levels_byte_length());
+ final BytesInput values =
this.readAsBytesInput(dataSize);
+ if (options.usePageChecksumVerification() &&
pageHeader.isSetCrc()) {
+ pageBytes =
+ BytesInput.concat(repetitionLevels,
definitionLevels, values);
+ verifyCrc(
+ pageHeader.getCrc(),
+ pageBytes,
+ "could not verify page integrity, CRC
checksum verification failed");
+ }
+ DataPageV2 dataPageV2 =
new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
- this.readAsBytesInput(
-
dataHeaderV2.getRepetition_levels_byte_length()),
- this.readAsBytesInput(
-
dataHeaderV2.getDefinition_levels_byte_length()),
+ repetitionLevels,
+ definitionLevels,
converter.getEncoding(dataHeaderV2.getEncoding()),
- this.readAsBytesInput(dataSize),
+ values,
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV2.getStatistics(),
type),
- dataHeaderV2.isIs_compressed()));
+ dataHeaderV2.isIs_compressed());
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dataPageV2.setCrc(pageHeader.getCrc());
+ }
+ pagesInChunk.add(dataPageV2);
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
@@ -1346,7 +1394,8 @@ public class ParquetFileReader implements Closeable {
pageBlockDecryptor,
aadPrefix,
rowGroupOrdinal,
- columnOrdinal);
+ columnOrdinal,
+ options);
}
private boolean hasMorePages(long valuesCountReadSoFar, int
dataPageCountReadSoFar) {
@@ -1528,11 +1577,14 @@ public class ParquetFileReader implements Closeable {
if (lastAllocationSize > 0) {
buffers.add(options.getAllocator().allocate(lastAllocationSize));
}
+ builder.addBuffersToRelease(buffers);
+ long readStart = System.nanoTime();
for (ByteBuffer buffer : buffers) {
f.readFully(buffer);
buffer.flip();
}
+ setReadMetrics(readStart, length);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
@@ -1542,24 +1594,60 @@ public class ParquetFileReader implements Closeable {
}
}
+ private void setReadMetrics(long startNs, long len) {
+ ParquetMetricsCallback metricsCallback =
options.getMetricsCallback();
+ if (metricsCallback != null) {
+ long totalFileReadTimeNs = Math.max(System.nanoTime() -
startNs, 0);
+ double sizeInMb = ((double) len) / (1024 * 1024);
+ double timeInSec = ((double) totalFileReadTimeNs) /
1000_0000_0000L;
+ double throughput = sizeInMb / timeInSec;
+ LOG.debug(
+ "Parquet: File Read stats: Length: {} MB, Time: {}
secs, throughput: {} MB/sec ",
+ sizeInMb,
+ timeInSec,
+ throughput);
+ metricsCallback.setDuration(
+ ParquetFileReaderMetrics.ReadTime.name(),
totalFileReadTimeNs);
+
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length);
+ metricsCallback.setValueDouble(
+ ParquetFileReaderMetrics.ReadThroughput.name(),
throughput);
+ }
+ }
+
/**
- * Populate data in a parquet file range from a vectored range.
+ * Populate data in a parquet file range from a vectored range; will
block for up to {@link
+ * #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds.
*
* @param currRange range to populated.
* @param builder used to build chunk list to read the pages for the
different columns.
* @throws IOException if there is an error while reading from the
stream, including a
* timeout.
*/
- public void readFromVectoredRange(FileRange currRange,
ChunkListBuilder builder)
+ public void readFromVectoredRange(ParquetFileRange currRange,
ChunkListBuilder builder)
throws IOException {
- byte[] buffer;
+ ByteBuffer buffer;
+ final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
+ long readStart = System.nanoTime();
try {
- buffer = currRange.getData().get();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
+ LOG.debug(
+ "Waiting for vectored read to finish for range {} with
timeout {} seconds",
+ currRange,
+ timeoutSeconds);
+ buffer =
+ FutureIO.awaitFuture(
+ currRange.getDataReadFuture(), timeoutSeconds,
TimeUnit.SECONDS);
+ setReadMetrics(readStart, currRange.getLength());
+ // report in a counter the data we just scanned
+ BenchmarkCounter.incrementBytesRead(currRange.getLength());
+ } catch (TimeoutException e) {
+ String error =
+ String.format(
+ "Timeout while fetching result for %s with
time limit %d seconds",
+ currRange, timeoutSeconds);
+ LOG.error(error, e);
+ throw new IOException(error, e);
}
-
- ByteBufferInputStream stream =
ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer));
+ ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
for (ChunkDescriptor descriptor : chunks) {
builder.add(descriptor, stream.sliceBuffers(descriptor.size),
f);
}
diff --git a/paimon-format/src/main/resources/META-INF/NOTICE
b/paimon-format/src/main/resources/META-INF/NOTICE
index 44e3ca97a9..9c8cbf4171 100644
--- a/paimon-format/src/main/resources/META-INF/NOTICE
+++ b/paimon-format/src/main/resources/META-INF/NOTICE
@@ -13,18 +13,18 @@ This project bundles the following dependencies under the
Apache Software Licens
- commons-lang:commons-lang:2.6
- org.apache.commons:commons-lang3:3.12.0
-- org.apache.avro:avro:1.11.3
+- org.apache.avro:avro:1.11.4
- com.fasterxml.jackson.core:jackson-core:2.14.2
- com.fasterxml.jackson.core:jackson-databind:2.14.2
- com.fasterxml.jackson.core:jackson-annotations:2.14.2
- org.apache.commons:commons-compress:1.22
-- org.apache.parquet:parquet-hadoop:1.13.1
-- org.apache.parquet:parquet-column:1.13.1
-- org.apache.parquet:parquet-common:1.13.1
-- org.apache.parquet:parquet-encoding:1.13.1
-- org.apache.parquet:parquet-format-structures:1.13.1
-- org.apache.parquet:parquet-jackson:1.13.1
+- org.apache.parquet:parquet-hadoop:1.15.1
+- org.apache.parquet:parquet-column:1.15.1
+- org.apache.parquet:parquet-common:1.15.1
+- org.apache.parquet:parquet-encoding:1.15.1
+- org.apache.parquet:parquet-format-structures:1.15.1
+- org.apache.parquet:parquet-jackson:1.15.1
- commons-pool:commons-pool:1.6
This project bundles the following dependencies under the BSD license.
diff --git a/pom.xml b/pom.xml
index a65d95b9f7..87eb328824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@ under the License.
<flink.reuseForks>true</flink.reuseForks>
<testcontainers.version>1.19.1</testcontainers.version>
<iceberg.version>1.6.1</iceberg.version>
- <parquet.version>1.13.1</parquet.version>
+ <parquet.version>1.15.1</parquet.version>
<orc.version>1.9.2</orc.version>
<protobuf-java.version>3.19.6</protobuf-java.version>
<roaringbitmap.version>1.2.1</roaringbitmap.version>
@@ -133,7 +133,7 @@ under the License.
<jaxb.api.version>2.3.1</jaxb.api.version>
<findbugs.version>1.3.9</findbugs.version>
<json-smart.version>2.5.2</json-smart.version>
- <avro.version>1.11.3</avro.version>
+ <avro.version>1.11.4</avro.version>
<kafka.version>3.2.3</kafka.version>
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
<scalatest-maven-plugin.version>2.1.0</scalatest-maven-plugin.version>
@@ -979,7 +979,7 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>3.4.1</version>
+ <version>3.5.3</version>
</plugin>
<!-- configure scala style -->