This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 581478a61 [orc] update orc version to support schema evolution filter 
push down (#2202)
581478a61 is described below

commit 581478a617f36f34b23757f6b4fff50fc7f052c2
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 31 12:30:15 2023 +0800

    [orc] update orc version to support schema evolution filter push down 
(#2202)
---
 paimon-core/pom.xml                                |   6 +-
 paimon-flink/paimon-flink-common/pom.xml           |  12 +
 .../paimon/flink/sink/CommitterOperatorTest.java   |   2 +-
 paimon-format/pom.xml                              |  13 +-
 .../apache/paimon/format/orc/OrcFileFormat.java    |   3 +-
 .../apache/paimon/format/orc/OrcReaderFactory.java |  22 +-
 .../apache/paimon/format/orc/OrcWriterFactory.java |  15 +-
 .../format/orc/writer/PhysicalWriterImpl.java      | 396 ---------------------
 .../paimon/format/orc/OrcReaderFactoryTest.java    |   6 +-
 paimon-spark/paimon-spark-common/pom.xml           |  16 +
 10 files changed, 70 insertions(+), 421 deletions(-)

diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 935ba024d..a43c9d99b 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -108,6 +108,10 @@ under the License.
                     <groupId>jdk.tools</groupId>
                     <artifactId>jdk.tools</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -175,7 +179,7 @@ under the License.
         <dependency>
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-storage-api</artifactId>
-            <version>2.6.0</version>
+            <version>2.8.1</version>
             <scope>test</scope>
         </dependency>
 
diff --git a/paimon-flink/paimon-flink-common/pom.xml 
b/paimon-flink/paimon-flink-common/pom.xml
index b1dcb26ba..1e3ee958b 100644
--- a/paimon-flink/paimon-flink-common/pom.xml
+++ b/paimon-flink/paimon-flink-common/pom.xml
@@ -88,6 +88,10 @@ under the License.
                     <groupId>jdk.tools</groupId>
                     <artifactId>jdk.tools</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -109,6 +113,10 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -152,6 +160,10 @@ under the License.
                     <groupId>jdk.tools</groupId>
                     <artifactId>jdk.tools</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 8a9bf96e2..51b53e092 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -398,7 +398,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         StoreCommitter committer = new StoreCommitter(commit, metricGroup);
         committer.commit(Collections.singletonList(manifestCommittable));
         CommitterMetrics metrics = committer.getCommitterMetrics();
-        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
+        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(285);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
     }
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index 1cf8cfdb5..0bbb6b526 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -33,7 +33,7 @@ under the License.
 
     <properties>
         <parquet.version>1.12.3</parquet.version>
-        <orc.version>1.5.6</orc.version>
+        <orc.version>1.8.3</orc.version>
         <joda-time.version>2.5</joda-time.version>
         <commons.pool.version>1.6</commons.pool.version>
         <commons.lang3.version>3.12.0</commons.lang3.version>
@@ -76,6 +76,10 @@ under the License.
                     <groupId>jdk.tools</groupId>
                     <artifactId>jdk.tools</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -110,7 +114,6 @@ under the License.
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
             <version>${commons.lang3.version}</version>
-            <scope>provided</scope>
         </dependency>
 
         <!-- Orc End -->
@@ -279,7 +282,9 @@ under the License.
                                     
<include>org.apache.hive:hive-storage-api</include>
                                     <include>io.airlift:aircompressor</include>
                                     
<include>commons-lang:commons-lang</include>
+                                    
<include>org.apache.commons:commons-lang3</include>
                                     
<include>com.google.protobuf:protobuf-java</include>
+                                    
<include>org.threeten:threeten-extra</include>
 
                                     <!-- Avro -->
                                     <include>org.apache.avro:avro</include>
@@ -334,6 +339,10 @@ under the License.
                                     <pattern>com.google.protobuf</pattern>
                                     
<shadedPattern>org.apache.paimon.shade.com.google.protobuf</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.threeten.extra</pattern>
+                                    
<shadedPattern>org.apache.paimon.shade.org.threeten.extra</shadedPattern>
+                                </relocation>
 
                                 <!-- Relocate Avro. -->
                                 <relocation>
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 62957f24a..68178ae0f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -110,8 +110,7 @@ public class OrcFileFormat extends FileFormat {
 
         return new OrcReaderFactory(
                 readerConf,
-                (RowType) refineDataType(type),
-                Projection.of(projection).toTopLevelIndexes(),
+                Projection.of(projection).project((RowType) 
refineDataType(type)),
                 orcPredicates,
                 formatContext.readBatchSize());
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 02bb65802..bd3bd7898 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -64,28 +64,23 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
 
     private final RowType tableType;
 
-    protected final int[] selectedFields;
-
     protected final List<OrcFilters.Predicate> conjunctPredicates;
 
     protected final int batchSize;
 
     /**
      * @param hadoopConfig the hadoop config for orc reader.
-     * @param selectedFields the read selected field of orc format.
      * @param conjunctPredicates the filter predicates that can be evaluated.
      * @param batchSize the batch size of orc reader.
      */
     public OrcReaderFactory(
             final org.apache.hadoop.conf.Configuration hadoopConfig,
-            final RowType tableType,
-            final int[] selectedFields,
+            final RowType readType,
             final List<OrcFilters.Predicate> conjunctPredicates,
             final int batchSize) {
         this.hadoopConfigWrapper = new 
SerializableHadoopConfigWrapper(checkNotNull(hadoopConfig));
-        this.schema = toOrcType(tableType);
-        this.tableType = tableType;
-        this.selectedFields = checkNotNull(selectedFields);
+        this.schema = toOrcType(readType);
+        this.tableType = readType;
         this.conjunctPredicates = checkNotNull(conjunctPredicates);
         this.batchSize = batchSize;
     }
@@ -105,7 +100,6 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
                 createRecordReader(
                         hadoopConfigWrapper.getHadoopConfig(),
                         schema,
-                        selectedFields,
                         conjunctPredicates,
                         fileIO,
                         file,
@@ -126,10 +120,10 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
         List<DataType> tableFieldTypes = tableType.getFieldTypes();
 
         // create and initialize the row batch
-        ColumnVector[] vectors = new ColumnVector[selectedFields.length];
+        ColumnVector[] vectors = new ColumnVector[tableType.getFieldCount()];
         for (int i = 0; i < vectors.length; i++) {
-            String name = tableFieldNames.get(selectedFields[i]);
-            DataType type = tableFieldTypes.get(selectedFields[i]);
+            String name = tableFieldNames.get(i);
+            DataType type = tableFieldTypes.get(i);
             vectors[i] = 
createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type);
         }
         return new OrcReaderBatch(orcBatch, new 
VectorizedColumnBatch(vectors), recycler);
@@ -251,7 +245,6 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
     private static RecordReader createRecordReader(
             org.apache.hadoop.conf.Configuration conf,
             TypeDescription schema,
-            int[] selectedFields,
             List<OrcFilters.Predicate> conjunctPredicates,
             FileIO fileIO,
             org.apache.paimon.fs.Path path,
@@ -285,9 +278,6 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
                 options.searchArgument(b.build(), new String[] {});
             }
 
-            // configure selected fields
-            options.include(computeProjectionMask(schema, selectedFields));
-
             // create ORC row reader
             RecordReader orcRowsReader = orcReader.rows(options);
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 7fd17d7a3..63f555653 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -24,15 +24,17 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.orc.writer.OrcBulkWriter;
-import org.apache.paimon.format.orc.writer.PhysicalWriterImpl;
 import org.apache.paimon.format.orc.writer.Vectorizer;
 import org.apache.paimon.fs.PositionOutputStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
+import org.apache.orc.impl.PhysicalFsWriter;
 import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -106,7 +108,16 @@ public class OrcWriterFactory implements 
FormatWriterFactory {
         }
 
         OrcFile.WriterOptions opts = getWriterOptions();
-        opts.physicalWriter(new PhysicalWriterImpl(out, opts));
+        opts.physicalWriter(
+                new PhysicalFsWriter(
+                        new FSDataOutputStream(out, null) {
+                            @Override
+                            public void close() throws IOException {
+                                // do nothing
+                            }
+                        },
+                        opts,
+                        new WriterEncryptionVariant[0]));
 
         // The path of the Writer is not used to indicate the destination file
         // in this case since we have used a dedicated physical writer to write
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
deleted file mode 100644
index 20112604e..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/PhysicalWriterImpl.java
+++ /dev/null
@@ -1,396 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.orc.writer;
-
-import org.apache.paimon.fs.PositionOutputStream;
-
-import com.google.protobuf.CodedOutputStream;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.OrcProto;
-import org.apache.orc.PhysicalWriter;
-import org.apache.orc.impl.HadoopShims;
-import org.apache.orc.impl.OrcCodecPool;
-import org.apache.orc.impl.OutStream;
-import org.apache.orc.impl.StreamName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
-
-/**
- * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
- *
- * <p>Whereas PhysicalFsWriter implementation works on the basis of a Path, 
this implementation
- * leverages Paimon's {@link PositionOutputStream} to write the compressed 
data.
- *
- * <p>NOTE: If the ORC dependency version is updated, this file may have to be 
updated as well to be
- * in sync with the new version's PhysicalFsWriter.
- */
-public class PhysicalWriterImpl implements PhysicalWriter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(PhysicalWriterImpl.class);
-    private static final byte[] ZEROS = new byte[64 * 1024];
-    private static final int HDFS_BUFFER_SIZE = 256 * 1024;
-
-    protected final OutStream writer;
-    private final CodedOutputStream protobufWriter;
-    private final CompressionKind compress;
-    private final Map<StreamName, BufferedStream> streams;
-    private final HadoopShims shims;
-    private final int maxPadding;
-    private final int bufferSize;
-    private final long blockSize;
-    private final boolean addBlockPadding;
-    private final boolean writeVariableLengthBlocks;
-
-    private CompressionCodec codec;
-    private final PositionOutputStream out;
-    private long headerLength;
-    private long stripeStart;
-    private long blockOffset;
-    private int metadataLength;
-    private int footerLength;
-
-    public PhysicalWriterImpl(PositionOutputStream out, OrcFile.WriterOptions 
opts)
-            throws IOException {
-        if (opts.isEnforceBufferSize()) {
-            this.bufferSize = opts.getBufferSize();
-        } else {
-            this.bufferSize =
-                    getEstimatedBufferSize(
-                            opts.getStripeSize(),
-                            opts.getSchema().getMaximumId() + 1,
-                            opts.getBufferSize());
-        }
-
-        this.out = out;
-        this.blockOffset = 0;
-        this.blockSize = opts.getBlockSize();
-        this.maxPadding = (int) (opts.getPaddingTolerance() * (double) 
opts.getBufferSize());
-        this.compress = opts.getCompress();
-        this.codec = OrcCodecPool.getCodec(this.compress);
-        this.streams = new TreeMap<>();
-        this.writer =
-                new OutStream("metadata", this.bufferSize, this.codec, new 
DirectStream(this.out));
-        this.shims = opts.getHadoopShims();
-        this.addBlockPadding = opts.getBlockPadding();
-        this.protobufWriter = CodedOutputStream.newInstance(this.writer);
-        this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
-    }
-
-    @Override
-    public void writeHeader() throws IOException {
-        this.out.write("ORC".getBytes());
-        this.headerLength = this.out.getPos();
-    }
-
-    @Override
-    public OutputReceiver createDataStream(StreamName name) {
-        BufferedStream result = streams.get(name);
-
-        if (result == null) {
-            result = new BufferedStream();
-            streams.put(name, result);
-        }
-
-        return result;
-    }
-
-    @Override
-    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, 
CompressionCodec codec)
-            throws IOException {
-        OutputStream stream =
-                new OutStream(this.toString(), bufferSize, codec, 
createDataStream(name));
-        index.build().writeTo(stream);
-        stream.flush();
-    }
-
-    @Override
-    public void writeBloomFilter(
-            StreamName name, OrcProto.BloomFilterIndex.Builder bloom, 
CompressionCodec codec)
-            throws IOException {
-        OutputStream stream =
-                new OutStream(this.toString(), bufferSize, codec, 
createDataStream(name));
-        bloom.build().writeTo(stream);
-        stream.flush();
-    }
-
-    @Override
-    public void finalizeStripe(
-            OrcProto.StripeFooter.Builder footerBuilder,
-            OrcProto.StripeInformation.Builder dirEntry)
-            throws IOException {
-        long indexSize = 0;
-        long dataSize = 0;
-
-        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
-            BufferedStream receiver = pair.getValue();
-            if (!receiver.isSuppressed) {
-                long streamSize = receiver.getOutputSize();
-                StreamName name = pair.getKey();
-                footerBuilder.addStreams(
-                        OrcProto.Stream.newBuilder()
-                                .setColumn(name.getColumn())
-                                .setKind(name.getKind())
-                                .setLength(streamSize));
-                if (StreamName.Area.INDEX == name.getArea()) {
-                    indexSize += streamSize;
-                } else {
-                    dataSize += streamSize;
-                }
-            }
-        }
-
-        dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
-        OrcProto.StripeFooter footer = footerBuilder.build();
-        // Do we need to pad the file so the stripe doesn't straddle a block 
boundary?
-        padStripe(indexSize + dataSize + footer.getSerializedSize());
-
-        // write out the data streams
-        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
-            pair.getValue().spillToDiskAndClear(out);
-        }
-
-        // Write out the footer.
-        writeStripeFooter(footer, dataSize, indexSize, dirEntry);
-    }
-
-    @Override
-    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws 
IOException {
-        long startPosition = out.getPos();
-        OrcProto.Metadata metadata = builder.build();
-        writeMetadata(metadata);
-        this.metadataLength = (int) (out.getPos() - startPosition);
-    }
-
-    @Override
-    public void writeFileFooter(OrcProto.Footer.Builder builder) throws 
IOException {
-        long bodyLength = out.getPos() - metadataLength;
-        builder.setContentLength(bodyLength);
-        builder.setHeaderLength(headerLength);
-        long startPosition = out.getPos();
-        OrcProto.Footer footer = builder.build();
-        writeFileFooter(footer);
-        this.footerLength = (int) (out.getPos() - startPosition);
-    }
-
-    @Override
-    public long writePostScript(OrcProto.PostScript.Builder builder) throws 
IOException {
-        builder.setFooterLength(footerLength);
-        builder.setMetadataLength(metadataLength);
-
-        OrcProto.PostScript ps = builder.build();
-        // need to write this uncompressed
-        long startPosition = out.getPos();
-        ps.writeTo(out);
-        long length = out.getPos() - startPosition;
-
-        if (length > 255) {
-            throw new IllegalArgumentException("PostScript too large at " + 
length);
-        }
-
-        out.write((int) length);
-        return out.getPos();
-    }
-
-    @Override
-    public void close() {
-        // Just release the codec but don't close the internal stream here to 
avoid
-        // Stream Closed or ClosedChannelException when performs checkpoint.
-        OrcCodecPool.returnCodec(compress, codec);
-        codec = null;
-    }
-
-    @Override
-    public void flush() throws IOException {
-        out.flush();
-    }
-
-    @Override
-    public void appendRawStripe(ByteBuffer buffer, 
OrcProto.StripeInformation.Builder dirEntry)
-            throws IOException {
-        long start = out.getPos();
-        int length = buffer.remaining();
-        long availBlockSpace = blockSize - (start % blockSize);
-
-        // see if stripe can fit in the current hdfs block, else pad the 
remaining
-        // space in the block
-        if (length < blockSize && length > availBlockSpace && addBlockPadding) 
{
-            byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, 
availBlockSpace)];
-            LOG.info(String.format("Padding ORC by %d bytes while merging..", 
availBlockSpace));
-            start += availBlockSpace;
-            while (availBlockSpace > 0) {
-                int writeLen = (int) Math.min(availBlockSpace, pad.length);
-                out.write(pad, 0, writeLen);
-                availBlockSpace -= writeLen;
-            }
-        }
-
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), 
length);
-        dirEntry.setOffset(start);
-    }
-
-    @Override
-    public CompressionCodec getCompressionCodec() {
-        return this.codec;
-    }
-
-    @Override
-    public long getFileBytes(int column) {
-        long size = 0;
-
-        for (final Map.Entry<StreamName, BufferedStream> pair : 
streams.entrySet()) {
-            final BufferedStream receiver = pair.getValue();
-            if (!receiver.isSuppressed) {
-
-                final StreamName name = pair.getKey();
-                if (name.getColumn() == column && name.getArea() != 
StreamName.Area.INDEX) {
-                    size += receiver.getOutputSize();
-                }
-            }
-        }
-
-        return size;
-    }
-
-    private void padStripe(long stripeSize) throws IOException {
-        this.stripeStart = out.getPos();
-        long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
-
-        // We only have options if this isn't the first stripe in the block
-        if (previousBytesInBlock > 0) {
-            if (previousBytesInBlock + stripeSize >= blockSize) {
-                // Try making a short block
-                if (writeVariableLengthBlocks && 
shims.endVariableLengthBlock(out)) {
-                    blockOffset = stripeStart;
-                } else if (addBlockPadding) {
-                    // if we cross the block boundary, figure out what we 
should do
-                    long padding = blockSize - previousBytesInBlock;
-                    if (padding <= maxPadding) {
-                        writeZeros(out, padding);
-                        stripeStart += padding;
-                    }
-                }
-            }
-        }
-    }
-
-    private void writeStripeFooter(
-            OrcProto.StripeFooter footer,
-            long dataSize,
-            long indexSize,
-            OrcProto.StripeInformation.Builder dirEntry)
-            throws IOException {
-        writeStripeFooter(footer);
-
-        dirEntry.setOffset(stripeStart);
-        dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - 
indexSize);
-    }
-
-    protected void writeMetadata(OrcProto.Metadata metadata) throws 
IOException {
-        metadata.writeTo(protobufWriter);
-        protobufWriter.flush();
-        writer.flush();
-    }
-
-    protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
-        footer.writeTo(protobufWriter);
-        protobufWriter.flush();
-        writer.flush();
-    }
-
-    protected void writeStripeFooter(OrcProto.StripeFooter footer) throws 
IOException {
-        footer.writeTo(protobufWriter);
-        protobufWriter.flush();
-        writer.flush();
-    }
-
-    private static void writeZeros(OutputStream output, long remaining) throws 
IOException {
-        while (remaining > 0) {
-            long size = Math.min(ZEROS.length, remaining);
-            output.write(ZEROS, 0, (int) size);
-            remaining -= size;
-        }
-    }
-
-    private static class DirectStream implements OutputReceiver {
-        private final PositionOutputStream output;
-
-        DirectStream(PositionOutputStream output) {
-            this.output = output;
-        }
-
-        public void output(ByteBuffer buffer) throws IOException {
-            this.output.write(
-                    buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
-        }
-
-        public void suppress() {
-            throw new UnsupportedOperationException("Can't suppress direct 
stream");
-        }
-    }
-
-    private static final class BufferedStream implements OutputReceiver {
-        private boolean isSuppressed = false;
-        private final List<ByteBuffer> output = new ArrayList<>();
-
-        @Override
-        public void output(ByteBuffer buffer) {
-            if (!isSuppressed) {
-                output.add(buffer);
-            }
-        }
-
-        public void suppress() {
-            isSuppressed = true;
-            output.clear();
-        }
-
-        void spillToDiskAndClear(PositionOutputStream raw) throws IOException {
-            if (!isSuppressed) {
-                for (ByteBuffer buffer : output) {
-                    raw.write(
-                            buffer.array(),
-                            buffer.arrayOffset() + buffer.position(),
-                            buffer.remaining());
-                }
-                output.clear();
-            }
-            isSuppressed = false;
-        }
-
-        public long getOutputSize() {
-            long result = 0;
-            for (ByteBuffer buffer : output) {
-                result += buffer.remaining();
-            }
-            return result;
-        }
-    }
-}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index a4c219dc1..3ffbaca07 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DecimalUtils;
+import org.apache.paimon.utils.Projection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.BeforeAll;
@@ -175,7 +176,10 @@ class OrcReaderFactoryTest {
             int[] selectedFields,
             List<OrcFilters.Predicate> conjunctPredicates) {
         return new OrcReaderFactory(
-                new Configuration(), formatType, selectedFields, 
conjunctPredicates, BATCH_SIZE);
+                new Configuration(),
+                Projection.of(selectedFields).project(formatType),
+                conjunctPredicates,
+                BATCH_SIZE);
     }
 
     private RecordReader<InternalRow> createReader(OrcReaderFactory format, 
Path split)
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index 140d3c77d..2cb2326cf 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -76,6 +76,10 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -97,6 +101,10 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
@@ -119,6 +127,10 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-mapreduce</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -144,6 +156,10 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>

Reply via email to