This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 581478a61 [orc] update orc version to support schema evolution filter
push down (#2202)
581478a61 is described below
commit 581478a617f36f34b23757f6b4fff50fc7f052c2
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 31 12:30:15 2023 +0800
[orc] update orc version to support schema evolution filter push down
(#2202)
---
paimon-core/pom.xml | 6 +-
paimon-flink/paimon-flink-common/pom.xml | 12 +
.../paimon/flink/sink/CommitterOperatorTest.java | 2 +-
paimon-format/pom.xml | 13 +-
.../apache/paimon/format/orc/OrcFileFormat.java | 3 +-
.../apache/paimon/format/orc/OrcReaderFactory.java | 22 +-
.../apache/paimon/format/orc/OrcWriterFactory.java | 15 +-
.../format/orc/writer/PhysicalWriterImpl.java | 396 ---------------------
.../paimon/format/orc/OrcReaderFactoryTest.java | 6 +-
paimon-spark/paimon-spark-common/pom.xml | 16 +
10 files changed, 70 insertions(+), 421 deletions(-)
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 935ba024d..a43c9d99b 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -108,6 +108,10 @@ under the License.
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -175,7 +179,7 @@ under the License.
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
- <version>2.6.0</version>
+ <version>2.8.1</version>
<scope>test</scope>
</dependency>
diff --git a/paimon-flink/paimon-flink-common/pom.xml
b/paimon-flink/paimon-flink-common/pom.xml
index b1dcb26ba..1e3ee958b 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -88,6 +88,10 @@ under the License.
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -109,6 +113,10 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -152,6 +160,10 @@ under the License.
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 8a9bf96e2..51b53e092 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -398,7 +398,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
StoreCommitter committer = new StoreCommitter(commit, metricGroup);
committer.commit(Collections.singletonList(manifestCommittable));
CommitterMetrics metrics = committer.getCommitterMetrics();
- assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
+ assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(285);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
committer.close();
}
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index 1cf8cfdb5..0bbb6b526 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -33,7 +33,7 @@ under the License.
<properties>
<parquet.version>1.12.3</parquet.version>
- <orc.version>1.5.6</orc.version>
+ <orc.version>1.8.3</orc.version>
<joda-time.version>2.5</joda-time.version>
<commons.pool.version>1.6</commons.pool.version>
<commons.lang3.version>3.12.0</commons.lang3.version>
@@ -76,6 +76,10 @@ under the License.
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -110,7 +114,6 @@ under the License.
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons.lang3.version}</version>
- <scope>provided</scope>
</dependency>
<!-- Orc End -->
@@ -279,7 +282,9 @@ under the License.
<include>org.apache.hive:hive-storage-api</include>
<include>io.airlift:aircompressor</include>
<include>commons-lang:commons-lang</include>
+
<include>org.apache.commons:commons-lang3</include>
<include>com.google.protobuf:protobuf-java</include>
+
<include>org.threeten:threeten-extra</include>
<!-- Avro -->
<include>org.apache.avro:avro</include>
@@ -334,6 +339,10 @@ under the License.
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.paimon.shade.com.google.protobuf</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.threeten.extra</pattern>
+
<shadedPattern>org.apache.paimon.shade.org.threeten.extra</shadedPattern>
+ </relocation>
<!-- Relocate Avro. -->
<relocation>
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 62957f24a..68178ae0f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -110,8 +110,7 @@ public class OrcFileFormat extends FileFormat {
return new OrcReaderFactory(
readerConf,
- (RowType) refineDataType(type),
- Projection.of(projection).toTopLevelIndexes(),
+ Projection.of(projection).project((RowType)
refineDataType(type)),
orcPredicates,
formatContext.readBatchSize());
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 02bb65802..bd3bd7898 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -64,28 +64,23 @@ public class OrcReaderFactory implements
FormatReaderFactory {
private final RowType tableType;
- protected final int[] selectedFields;
-
protected final List<OrcFilters.Predicate> conjunctPredicates;
protected final int batchSize;
/**
* @param hadoopConfig the hadoop config for orc reader.
- * @param selectedFields the read selected field of orc format.
* @param conjunctPredicates the filter predicates that can be evaluated.
* @param batchSize the batch size of orc reader.
*/
public OrcReaderFactory(
final org.apache.hadoop.conf.Configuration hadoopConfig,
- final RowType tableType,
- final int[] selectedFields,
+ final RowType readType,
final List<OrcFilters.Predicate> conjunctPredicates,
final int batchSize) {
this.hadoopConfigWrapper = new
SerializableHadoopConfigWrapper(checkNotNull(hadoopConfig));
- this.schema = toOrcType(tableType);
- this.tableType = tableType;
- this.selectedFields = checkNotNull(selectedFields);
+ this.schema = toOrcType(readType);
+ this.tableType = readType;
this.conjunctPredicates = checkNotNull(conjunctPredicates);
this.batchSize = batchSize;
}
@@ -105,7 +100,6 @@ public class OrcReaderFactory implements
FormatReaderFactory {
createRecordReader(
hadoopConfigWrapper.getHadoopConfig(),
schema,
- selectedFields,
conjunctPredicates,
fileIO,
file,
@@ -126,10 +120,10 @@ public class OrcReaderFactory implements
FormatReaderFactory {
List<DataType> tableFieldTypes = tableType.getFieldTypes();
// create and initialize the row batch
- ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+ ColumnVector[] vectors = new ColumnVector[tableType.getFieldCount()];
for (int i = 0; i < vectors.length; i++) {
- String name = tableFieldNames.get(selectedFields[i]);
- DataType type = tableFieldTypes.get(selectedFields[i]);
+ String name = tableFieldNames.get(i);
+ DataType type = tableFieldTypes.get(i);
vectors[i] =
createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
}
return new OrcReaderBatch(orcBatch, new
VectorizedColumnBatch(vectors), recycler);
@@ -251,7 +245,6 @@ public class OrcReaderFactory implements
FormatReaderFactory {
private static RecordReader createRecordReader(
org.apache.hadoop.conf.Configuration conf,
TypeDescription schema,
- int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
FileIO fileIO,
org.apache.paimon.fs.Path path,
@@ -285,9 +278,6 @@ public class OrcReaderFactory implements
FormatReaderFactory {
options.searchArgument(b.build(), new String[] {});
}
- // configure selected fields
- options.include(computeProjectionMask(schema, selectedFields));
-
// create ORC row reader
RecordReader orcRowsReader = orcReader.rows(options);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 7fd17d7a3..63f555653 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -24,15 +24,17 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.orc.writer.OrcBulkWriter;
-import org.apache.paimon.format.orc.writer.PhysicalWriterImpl;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
+import org.apache.orc.impl.PhysicalFsWriter;
import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
import java.io.IOException;
import java.util.HashMap;
@@ -106,7 +108,16 @@ public class OrcWriterFactory implements
FormatWriterFactory {
}
OrcFile.WriterOptions opts = getWriterOptions();
- opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+ opts.physicalWriter(
+ new PhysicalFsWriter(
+ new FSDataOutputStream(out, null) {
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+ },
+ opts,
+ new WriterEncryptionVariant[0]));
// The path of the Writer is not used to indicate the destination file
// in this case since we have used a dedicated physical writer to write
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
deleted file mode 100644
index 20112604e..000000000
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.orc.writer;
-
-import org.apache.paimon.fs.PositionOutputStream;
-
-import com.google.protobuf.CodedOutputStream;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.OrcProto;
-import org.apache.orc.PhysicalWriter;
-import org.apache.orc.impl.HadoopShims;
-import org.apache.orc.impl.OrcCodecPool;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.StreamName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
-
-/**
- * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
- *
- * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path,
this implementation
- * leverages Paimon's {@link PositionOutputStream} to write the compressed
data.
- *
- * <p>NOTE: If the ORC dependency version is updated, this file may have to be
updated as well to be
- * in sync with the new version's PhysicalFsWriter.
- */
-public class PhysicalWriterImpl implements PhysicalWriter {
-
- private static final Logger LOG =
LoggerFactory.getLogger(PhysicalWriterImpl.class);
- private static final byte[] ZEROS = new byte[64 * 1024];
- private static final int HDFS_BUFFER_SIZE = 256 * 1024;
-
- protected final OutStream writer;
- private final CodedOutputStream protobufWriter;
- private final CompressionKind compress;
- private final Map<StreamName, BufferedStream> streams;
- private final HadoopShims shims;
- private final int maxPadding;
- private final int bufferSize;
- private final long blockSize;
- private final boolean addBlockPadding;
- private final boolean writeVariableLengthBlocks;
-
- private CompressionCodec codec;
- private final PositionOutputStream out;
- private long headerLength;
- private long stripeStart;
- private long blockOffset;
- private int metadataLength;
- private int footerLength;
-
- public PhysicalWriterImpl(PositionOutputStream out, OrcFile.WriterOptions
opts)
- throws IOException {
- if (opts.isEnforceBufferSize()) {
- this.bufferSize = opts.getBufferSize();
- } else {
- this.bufferSize =
- getEstimatedBufferSize(
- opts.getStripeSize(),
- opts.getSchema().getMaximumId() + 1,
- opts.getBufferSize());
- }
-
- this.out = out;
- this.blockOffset = 0;
- this.blockSize = opts.getBlockSize();
- this.maxPadding = (int) (opts.getPaddingTolerance() * (double)
opts.getBufferSize());
- this.compress = opts.getCompress();
- this.codec = OrcCodecPool.getCodec(this.compress);
- this.streams = new TreeMap<>();
- this.writer =
- new OutStream("metadata", this.bufferSize, this.codec, new
DirectStream(this.out));
- this.shims = opts.getHadoopShims();
- this.addBlockPadding = opts.getBlockPadding();
- this.protobufWriter = CodedOutputStream.newInstance(this.writer);
- this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
- }
-
- @Override
- public void writeHeader() throws IOException {
- this.out.write("ORC".getBytes());
- this.headerLength = this.out.getPos();
- }
-
- @Override
- public OutputReceiver createDataStream(StreamName name) {
- BufferedStream result = streams.get(name);
-
- if (result == null) {
- result = new BufferedStream();
- streams.put(name, result);
- }
-
- return result;
- }
-
- @Override
- public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index,
CompressionCodec codec)
- throws IOException {
- OutputStream stream =
- new OutStream(this.toString(), bufferSize, codec,
createDataStream(name));
- index.build().writeTo(stream);
- stream.flush();
- }
-
- @Override
- public void writeBloomFilter(
- StreamName name, OrcProto.BloomFilterIndex.Builder bloom,
CompressionCodec codec)
- throws IOException {
- OutputStream stream =
- new OutStream(this.toString(), bufferSize, codec,
createDataStream(name));
- bloom.build().writeTo(stream);
- stream.flush();
- }
-
- @Override
- public void finalizeStripe(
- OrcProto.StripeFooter.Builder footerBuilder,
- OrcProto.StripeInformation.Builder dirEntry)
- throws IOException {
- long indexSize = 0;
- long dataSize = 0;
-
- for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
- BufferedStream receiver = pair.getValue();
- if (!receiver.isSuppressed) {
- long streamSize = receiver.getOutputSize();
- StreamName name = pair.getKey();
- footerBuilder.addStreams(
- OrcProto.Stream.newBuilder()
- .setColumn(name.getColumn())
- .setKind(name.getKind())
- .setLength(streamSize));
- if (StreamName.Area.INDEX == name.getArea()) {
- indexSize += streamSize;
- } else {
- dataSize += streamSize;
- }
- }
- }
-
- dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
- OrcProto.StripeFooter footer = footerBuilder.build();
- // Do we need to pad the file so the stripe doesn't straddle a block
boundary?
- padStripe(indexSize + dataSize + footer.getSerializedSize());
-
- // write out the data streams
- for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
- pair.getValue().spillToDiskAndClear(out);
- }
-
- // Write out the footer.
- writeStripeFooter(footer, dataSize, indexSize, dirEntry);
- }
-
- @Override
- public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws
IOException {
- long startPosition = out.getPos();
- OrcProto.Metadata metadata = builder.build();
- writeMetadata(metadata);
- this.metadataLength = (int) (out.getPos() - startPosition);
- }
-
- @Override
- public void writeFileFooter(OrcProto.Footer.Builder builder) throws
IOException {
- long bodyLength = out.getPos() - metadataLength;
- builder.setContentLength(bodyLength);
- builder.setHeaderLength(headerLength);
- long startPosition = out.getPos();
- OrcProto.Footer footer = builder.build();
- writeFileFooter(footer);
- this.footerLength = (int) (out.getPos() - startPosition);
- }
-
- @Override
- public long writePostScript(OrcProto.PostScript.Builder builder) throws
IOException {
- builder.setFooterLength(footerLength);
- builder.setMetadataLength(metadataLength);
-
- OrcProto.PostScript ps = builder.build();
- // need to write this uncompressed
- long startPosition = out.getPos();
- ps.writeTo(out);
- long length = out.getPos() - startPosition;
-
- if (length > 255) {
- throw new IllegalArgumentException("PostScript too large at " +
length);
- }
-
- out.write((int) length);
- return out.getPos();
- }
-
- @Override
- public void close() {
- // Just release the codec but don't close the internal stream here to
avoid
- // Stream Closed or ClosedChannelException when performs checkpoint.
- OrcCodecPool.returnCodec(compress, codec);
- codec = null;
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void appendRawStripe(ByteBuffer buffer,
OrcProto.StripeInformation.Builder dirEntry)
- throws IOException {
- long start = out.getPos();
- int length = buffer.remaining();
- long availBlockSpace = blockSize - (start % blockSize);
-
- // see if stripe can fit in the current hdfs block, else pad the
remaining
- // space in the block
- if (length < blockSize && length > availBlockSpace && addBlockPadding)
{
- byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE,
availBlockSpace)];
- LOG.info(String.format("Padding ORC by %d bytes while merging..",
availBlockSpace));
- start += availBlockSpace;
- while (availBlockSpace > 0) {
- int writeLen = (int) Math.min(availBlockSpace, pad.length);
- out.write(pad, 0, writeLen);
- availBlockSpace -= writeLen;
- }
- }
-
- out.write(buffer.array(), buffer.arrayOffset() + buffer.position(),
length);
- dirEntry.setOffset(start);
- }
-
- @Override
- public CompressionCodec getCompressionCodec() {
- return this.codec;
- }
-
- @Override
- public long getFileBytes(int column) {
- long size = 0;
-
- for (final Map.Entry<StreamName, BufferedStream> pair :
streams.entrySet()) {
- final BufferedStream receiver = pair.getValue();
- if (!receiver.isSuppressed) {
-
- final StreamName name = pair.getKey();
- if (name.getColumn() == column && name.getArea() !=
StreamName.Area.INDEX) {
- size += receiver.getOutputSize();
- }
- }
- }
-
- return size;
- }
-
- private void padStripe(long stripeSize) throws IOException {
- this.stripeStart = out.getPos();
- long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
-
- // We only have options if this isn't the first stripe in the block
- if (previousBytesInBlock > 0) {
- if (previousBytesInBlock + stripeSize >= blockSize) {
- // Try making a short block
- if (writeVariableLengthBlocks &&
shims.endVariableLengthBlock(out)) {
- blockOffset = stripeStart;
- } else if (addBlockPadding) {
- // if we cross the block boundary, figure out what we
should do
- long padding = blockSize - previousBytesInBlock;
- if (padding <= maxPadding) {
- writeZeros(out, padding);
- stripeStart += padding;
- }
- }
- }
- }
- }
-
- private void writeStripeFooter(
- OrcProto.StripeFooter footer,
- long dataSize,
- long indexSize,
- OrcProto.StripeInformation.Builder dirEntry)
- throws IOException {
- writeStripeFooter(footer);
-
- dirEntry.setOffset(stripeStart);
- dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize -
indexSize);
- }
-
- protected void writeMetadata(OrcProto.Metadata metadata) throws
IOException {
- metadata.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- }
-
- protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- }
-
- protected void writeStripeFooter(OrcProto.StripeFooter footer) throws
IOException {
- footer.writeTo(protobufWriter);
- protobufWriter.flush();
- writer.flush();
- }
-
- private static void writeZeros(OutputStream output, long remaining) throws
IOException {
- while (remaining > 0) {
- long size = Math.min(ZEROS.length, remaining);
- output.write(ZEROS, 0, (int) size);
- remaining -= size;
- }
- }
-
- private static class DirectStream implements OutputReceiver {
- private final PositionOutputStream output;
-
- DirectStream(PositionOutputStream output) {
- this.output = output;
- }
-
- public void output(ByteBuffer buffer) throws IOException {
- this.output.write(
- buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
- }
-
- public void suppress() {
- throw new UnsupportedOperationException("Can't suppress direct
stream");
- }
- }
-
- private static final class BufferedStream implements OutputReceiver {
- private boolean isSuppressed = false;
- private final List<ByteBuffer> output = new ArrayList<>();
-
- @Override
- public void output(ByteBuffer buffer) {
- if (!isSuppressed) {
- output.add(buffer);
- }
- }
-
- public void suppress() {
- isSuppressed = true;
- output.clear();
- }
-
- void spillToDiskAndClear(PositionOutputStream raw) throws IOException {
- if (!isSuppressed) {
- for (ByteBuffer buffer : output) {
- raw.write(
- buffer.array(),
- buffer.arrayOffset() + buffer.position(),
- buffer.remaining());
- }
- output.clear();
- }
- isSuppressed = false;
- }
-
- public long getOutputSize() {
- long result = 0;
- for (ByteBuffer buffer : output) {
- result += buffer.remaining();
- }
- return result;
- }
- }
-}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index a4c219dc1..3ffbaca07 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DecimalUtils;
+import org.apache.paimon.utils.Projection;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.BeforeAll;
@@ -175,7 +176,10 @@ class OrcReaderFactoryTest {
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates) {
return new OrcReaderFactory(
- new Configuration(), formatType, selectedFields,
conjunctPredicates, BATCH_SIZE);
+ new Configuration(),
+ Projection.of(selectedFields).project(formatType),
+ conjunctPredicates,
+ BATCH_SIZE);
}
private RecordReader<InternalRow> createReader(OrcReaderFactory format,
Path split)
diff --git a/paimon-spark/paimon-spark-common/pom.xml
b/paimon-spark/paimon-spark-common/pom.xml
index 140d3c77d..2cb2326cf 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -76,6 +76,10 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -97,6 +101,10 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -119,6 +127,10 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-mapreduce</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -144,6 +156,10 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>