This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 363a69dad3c2 [FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 
1.9.4 (#25711)
363a69dad3c2 is described below

commit 363a69dad3c22c9b7063f4eb9ba9ca17b930432a
Author: mzzx <33508807+dycc...@users.noreply.github.com>
AuthorDate: Mon Jan 20 20:29:02 2025 +0800

    [FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 1.9.4 (#25711)
---
 flink-formats/flink-orc-nohive/pom.xml             |  70 ++
 .../nohive/writer/NoHivePhysicalWriterImpl.java    |   2 +-
 .../OrcColumnarRowSplitReaderNoHiveTest.java       |   4 +-
 flink-formats/flink-orc/pom.xml                    |  36 +-
 .../flink/orc/writer/OrcBulkWriterFactory.java     |  23 +-
 .../flink/orc/writer/PhysicalWriterImpl.java       | 741 +++++++++++++++------
 .../flink/orc/OrcColumnarRowInputFormatTest.java   |  34 +-
 .../flink/orc/OrcColumnarRowSplitReaderTest.java   |  53 +-
 .../flink/orc/OrcFormatStatisticsReportTest.java   |   4 +-
 .../src/main/resources/META-INF/NOTICE             |  10 +-
 pom.xml                                            |   3 +-
 11 files changed, 734 insertions(+), 246 deletions(-)

diff --git a/flink-formats/flink-orc-nohive/pom.xml 
b/flink-formats/flink-orc-nohive/pom.xml
index e623f8cd31e6..0e3c9f70b90b 100644
--- a/flink-formats/flink-orc-nohive/pom.xml
+++ b/flink-formats/flink-orc-nohive/pom.xml
@@ -87,6 +87,76 @@ under the License.
                                        <groupId>org.slf4j</groupId>
                                        <artifactId>slf4j-reload4j</artifactId>
                                </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-api</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>com.google.protobuf</groupId>
+                       <artifactId>protobuf-java</artifactId>
+                       <version>${protoc.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hive</groupId>
+                       <artifactId>hive-storage-api</artifactId>
+                       <version>${storage-api.version}</version>
+                       <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-hdfs</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-api</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-common</artifactId>
+                       <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>ch.qos.reload4j</groupId>
+                                       <artifactId>reload4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-reload4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.protobuf</groupId>
+                                       <artifactId>protobuf-java</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <scope>provided</scope>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>ch.qos.reload4j</groupId>
+                                       <artifactId>reload4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-reload4j</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.protobuf</groupId>
+                                       <artifactId>protobuf-java</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 
diff --git 
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java
 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java
index 0734e9bf90bc..30c417111cb3 100644
--- 
a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java
+++ 
b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java
@@ -21,9 +21,9 @@ package org.apache.flink.orc.nohive.writer;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.orc.writer.PhysicalWriterImpl;
 
-import com.google.protobuf25.CodedOutputStream;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
+import org.apache.orc.protobuf.CodedOutputStream;
 
 import java.io.IOException;
 
diff --git 
a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java
 
b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java
index 425ae3105a3c..de2f4fd8662f 100644
--- 
a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java
+++ 
b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Map;
-import java.util.stream.IntStream;
 
 /** Test for {@link OrcColumnarRowSplitReader}. */
 class OrcColumnarRowSplitReaderNoHiveTest extends 
OrcColumnarRowSplitReaderTest {
@@ -100,12 +99,13 @@ class OrcColumnarRowSplitReaderNoHiveTest extends 
OrcColumnarRowSplitReaderTest
     protected OrcColumnarRowSplitReader createReader(
             int[] selectedFields,
             DataType[] fullTypes,
+            String[] fullNames,
             Map<String, Object> partitionSpec,
             FileInputSplit split)
             throws IOException {
         return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader(
                 new Configuration(),
-                IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + 
i).toArray(String[]::new),
+                fullNames,
                 fullTypes,
                 partitionSpec,
                 selectedFields,
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 3f92726ba62c..d5176a8a036c 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -101,7 +101,11 @@ under the License.
                                </exclusion>
                                <exclusion>
                                        <groupId>org.slf4j</groupId>
-                                       <artifactId>slf4j-reload4j</artifactId>
+                                       <artifactId>slf4j-api</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.apache.commons</groupId>
+                                       <artifactId>commons-lang3</artifactId>
                                </exclusion>
                        </exclusions>
                </dependency>
@@ -125,6 +129,10 @@ under the License.
                                        <groupId>org.slf4j</groupId>
                                        <artifactId>slf4j-reload4j</artifactId>
                                </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.protobuf</groupId>
+                                       <artifactId>protobuf-java</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 
@@ -141,6 +149,32 @@ under the License.
                                        <groupId>org.slf4j</groupId>
                                        <artifactId>slf4j-reload4j</artifactId>
                                </exclusion>
+                               <exclusion>
+                                       <groupId>com.google.protobuf</groupId>
+                                       <artifactId>protobuf-java</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <dependency>
+                       <groupId>com.google.protobuf</groupId>
+                       <artifactId>protobuf-java</artifactId>
+                       <version>${protoc.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.hive</groupId>
+                       <artifactId>hive-storage-api</artifactId>
+                       <version>${storage-api.version}</version>
+                       <exclusions>
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-hdfs</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.slf4j</groupId>
+                                       <artifactId>slf4j-api</artifactId>
+                               </exclusion>
                        </exclusions>
                </dependency>
 
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
index 5e4310107ab4..010deb824a09 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.OrcFile;
 import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -96,14 +98,29 @@ public class OrcBulkWriterFactory<T> implements 
BulkWriter.Factory<T> {
     @Override
     public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
         OrcFile.WriterOptions opts = getWriterOptions();
-        opts.physicalWriter(new PhysicalWriterImpl(out, opts));
-
+        PhysicalWriterImpl physicalWriter = new PhysicalWriterImpl(out, opts);
+        opts.physicalWriter(physicalWriter);
         // The path of the Writer is not used to indicate the destination file
         // in this case since we have used a dedicated physical writer to write
         // to the give output stream directly. However, the path would be used 
as
         // the key of writer in the ORC memory manager, thus we need to make 
it unique.
         Path unusedPath = new Path(UUID.randomUUID().toString());
-        return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, 
unusedPath, opts));
+        WriterImpl writer = new WriterImpl(null, unusedPath, opts);
+
+        // Obtaining encryption variant from Writer, and setting encryption 
variant for
+        // physicalWriter.
+        try {
+            Field encryptionFiled = 
WriterImpl.class.getDeclaredField("encryption");
+            encryptionFiled.setAccessible(true);
+            WriterEncryptionVariant[] encryption =
+                    (WriterEncryptionVariant[]) encryptionFiled.get(writer);
+            physicalWriter.setEncryptionVariant(encryption);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(
+                    "Can not access to the encryption field in Class 
org.apache.orc.impl.WriterImpl",
+                    e);
+        }
+        return new OrcBulkWriter<>(vectorizer, writer);
     }
 
     @VisibleForTesting
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
index 56e16c99c0a3..7dd8b3789649 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java
@@ -21,16 +21,24 @@ package org.apache.flink.orc.writer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedOutputStream;
 import org.apache.orc.CompressionCodec;
-import org.apache.orc.CompressionKind;
+import org.apache.orc.EncryptionVariant;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcProto;
 import org.apache.orc.PhysicalWriter;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.CryptoUtils;
 import org.apache.orc.impl.HadoopShims;
 import org.apache.orc.impl.OrcCodecPool;
 import org.apache.orc.impl.OutStream;
+import org.apache.orc.impl.SerializationUtils;
 import org.apache.orc.impl.StreamName;
+import org.apache.orc.impl.WriterImpl;
+import org.apache.orc.impl.writer.StreamOptions;
+import org.apache.orc.impl.writer.WriterEncryptionKey;
+import org.apache.orc.impl.writer.WriterEncryptionVariant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,12 +46,11 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
-
 /**
  * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}.
  *
@@ -55,247 +62,227 @@ import static 
org.apache.orc.impl.WriterImpl.getEstimatedBufferSize;
  */
 @Internal
 public class PhysicalWriterImpl implements PhysicalWriter {
-
     private static final Logger LOG = 
LoggerFactory.getLogger(PhysicalWriterImpl.class);
-    private static final byte[] ZEROS = new byte[64 * 1024];
+
     private static final int HDFS_BUFFER_SIZE = 256 * 1024;
+    private static final byte[] ZEROS = new byte[64 * 1024];
+
+    private FSDataOutputStream rawWriter;
+    private final DirectStream rawStream;
 
     protected final OutStream writer;
-    private final CodedOutputStream protobufWriter;
-    private final CompressionKind compress;
-    private final Map<StreamName, BufferedStream> streams;
+    private final CodedOutputStream codedCompressStream;
+
     private final HadoopShims shims;
-    private final int maxPadding;
-    private final int bufferSize;
     private final long blockSize;
+    private final int maxPadding;
+    private final StreamOptions compress;
+    private final OrcFile.CompressionStrategy compressionStrategy;
     private final boolean addBlockPadding;
     private final boolean writeVariableLengthBlocks;
+    private final VariantTracker unencrypted;
 
-    private CompressionCodec codec;
-    private FSDataOutputStream out;
     private long headerLength;
     private long stripeStart;
+
     private long blockOffset;
     private int metadataLength;
+    private int stripeStatisticsLength = 0;
     private int footerLength;
+    private int stripeNumber = 0;
+
+    private final Map<WriterEncryptionVariant, VariantTracker> variants = new 
TreeMap<>();
 
     public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions 
opts)
             throws IOException {
+        this(out, opts, new WriterEncryptionVariant[0]);
+    }
+
+    public PhysicalWriterImpl(
+            FSDataOutputStream out,
+            OrcFile.WriterOptions opts,
+            WriterEncryptionVariant[] encryption)
+            throws IOException {
+        this.rawWriter = out;
+        long defaultStripeSize = opts.getStripeSize();
+        this.addBlockPadding = opts.getBlockPadding();
         if (opts.isEnforceBufferSize()) {
-            this.bufferSize = opts.getBufferSize();
+            this.compress = new StreamOptions(opts.getBufferSize());
         } else {
-            this.bufferSize =
-                    getEstimatedBufferSize(
-                            opts.getStripeSize(),
-                            opts.getSchema().getMaximumId() + 1,
-                            opts.getBufferSize());
+            this.compress =
+                    new StreamOptions(
+                            WriterImpl.getEstimatedBufferSize(
+                                    defaultStripeSize,
+                                    opts.getSchema().getMaximumId() + 1,
+                                    opts.getBufferSize()));
         }
-
-        this.out = out;
-        this.blockOffset = 0;
+        CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress());
+        if (codec != null) {
+            compress.withCodec(codec, codec.getDefaultOptions());
+        }
+        this.compressionStrategy = opts.getCompressionStrategy();
+        this.maxPadding = (int) (opts.getPaddingTolerance() * 
defaultStripeSize);
         this.blockSize = opts.getBlockSize();
-        this.maxPadding = (int) (opts.getPaddingTolerance() * (double) 
opts.getBufferSize());
-        this.compress = opts.getCompress();
-        this.codec = OrcCodecPool.getCodec(this.compress);
-        this.streams = new TreeMap<>();
-        this.writer =
-                new OutStream("metadata", this.bufferSize, this.codec, new 
DirectStream(this.out));
-        this.shims = opts.getHadoopShims();
-        this.addBlockPadding = opts.getBlockPadding();
-        this.protobufWriter = CodedOutputStream.newInstance(this.writer);
-        this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+        blockOffset = 0;
+        unencrypted = new VariantTracker(opts.getSchema(), compress);
+        writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
+        shims = opts.getHadoopShims();
+        rawStream = new DirectStream(rawWriter);
+        writer = new OutStream("stripe footer", compress, rawStream);
+        codedCompressStream = CodedOutputStream.newInstance(writer);
+        for (WriterEncryptionVariant variant : encryption) {
+            WriterEncryptionKey key = variant.getKeyDescription();
+            StreamOptions encryptOptions =
+                    new StreamOptions(unencrypted.options)
+                            .withEncryption(key.getAlgorithm(), 
variant.getFileFooterKey());
+            variants.put(variant, new VariantTracker(variant.getRoot(), 
encryptOptions));
+        }
     }
 
-    @Override
-    public void writeHeader() throws IOException {
-        this.out.write("ORC".getBytes());
-        this.headerLength = this.out.getPos();
+    public void setEncryptionVariant(WriterEncryptionVariant[] encryption) {
+        if (encryption == null) {
+            return;
+        }
+        for (WriterEncryptionVariant variant : encryption) {
+            WriterEncryptionKey key = variant.getKeyDescription();
+            StreamOptions encryptOptions =
+                    new StreamOptions(unencrypted.options)
+                            .withEncryption(key.getAlgorithm(), 
variant.getFileFooterKey());
+            variants.put(variant, new VariantTracker(variant.getRoot(), 
encryptOptions));
+        }
     }
 
-    @Override
-    public OutputReceiver createDataStream(StreamName name) throws IOException 
{
-        BufferedStream result = streams.get(name);
+    protected static class VariantTracker {
+        // the streams that make up the current stripe
+        protected final Map<StreamName, BufferedStream> streams = new 
TreeMap<>();
+        private final int rootColumn;
+        private final int lastColumn;
+        protected final StreamOptions options;
+        // a list for each column covered by this variant
+        // the elements in the list correspond to each stripe in the file
+        protected final List<OrcProto.ColumnStatistics>[] stripeStats;
+        protected final List<OrcProto.Stream> stripeStatsStreams = new 
ArrayList<>();
+        protected final OrcProto.ColumnStatistics[] fileStats;
+
+        VariantTracker(TypeDescription schema, StreamOptions options) {
+            rootColumn = schema.getId();
+            lastColumn = schema.getMaximumId();
+            this.options = options;
+            stripeStats = new List[schema.getMaximumId() - schema.getId() + 1];
+            for (int i = 0; i < stripeStats.length; ++i) {
+                stripeStats[i] = new ArrayList<>();
+            }
+            fileStats = new OrcProto.ColumnStatistics[stripeStats.length];
+        }
 
-        if (result == null) {
-            result = new BufferedStream();
+        public BufferedStream createStream(StreamName name) {
+            BufferedStream result = new BufferedStream();
             streams.put(name, result);
+            return result;
         }
 
-        return result;
-    }
-
-    @Override
-    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, 
CompressionCodec codec)
-            throws IOException {
-        OutputStream stream =
-                new OutStream(this.toString(), bufferSize, codec, 
createDataStream(name));
-        index.build().writeTo(stream);
-        stream.flush();
-    }
-
-    @Override
-    public void writeBloomFilter(
-            StreamName name, OrcProto.BloomFilterIndex.Builder bloom, 
CompressionCodec codec)
-            throws IOException {
-        OutputStream stream =
-                new OutStream(this.toString(), bufferSize, codec, 
createDataStream(name));
-        bloom.build().writeTo(stream);
-        stream.flush();
-    }
-
-    @Override
-    public void finalizeStripe(
-            OrcProto.StripeFooter.Builder footerBuilder,
-            OrcProto.StripeInformation.Builder dirEntry)
-            throws IOException {
-        long indexSize = 0;
-        long dataSize = 0;
-
-        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
-            BufferedStream receiver = pair.getValue();
-            if (!receiver.isSuppressed) {
-                long streamSize = receiver.getOutputSize();
-                StreamName name = pair.getKey();
-                footerBuilder.addStreams(
-                        OrcProto.Stream.newBuilder()
-                                .setColumn(name.getColumn())
-                                .setKind(name.getKind())
-                                .setLength(streamSize));
-                if (StreamName.Area.INDEX == name.getArea()) {
-                    indexSize += streamSize;
-                } else {
-                    dataSize += streamSize;
+        /**
+         * Place the streams in the appropriate area while updating the sizes 
with the number of
+         * bytes in the area.
+         *
+         * @param area the area to write
+         * @param sizes the sizes of the areas
+         * @return the list of stream descriptions to add
+         */
+        public List<OrcProto.Stream> placeStreams(StreamName.Area area, 
SizeCounters sizes) {
+            List<OrcProto.Stream> result = new ArrayList<>(streams.size());
+            for (Map.Entry<StreamName, BufferedStream> stream : 
streams.entrySet()) {
+                StreamName name = stream.getKey();
+                BufferedStream bytes = stream.getValue();
+                if (name.getArea() == area && !bytes.isSuppressed) {
+                    OrcProto.Stream.Builder builder = 
OrcProto.Stream.newBuilder();
+                    long size = bytes.getOutputSize();
+                    if (area == StreamName.Area.INDEX) {
+                        sizes.index += size;
+                    } else {
+                        sizes.data += size;
+                    }
+                    
builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size);
+                    result.add(builder.build());
                 }
             }
+            return result;
         }
 
-        dirEntry.setIndexLength(indexSize).setDataLength(dataSize);
-        OrcProto.StripeFooter footer = footerBuilder.build();
-        // Do we need to pad the file so the stripe doesn't straddle a block 
boundary?
-        padStripe(indexSize + dataSize + footer.getSerializedSize());
-
-        // write out the data streams
-        for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) {
-            pair.getValue().spillToDiskAndClear(out);
+        /**
+         * Write the streams in the appropriate area.
+         *
+         * @param area the area to write
+         * @param raw the raw stream to write to
+         */
+        public void writeStreams(StreamName.Area area, FSDataOutputStream raw) 
throws IOException {
+            for (Map.Entry<StreamName, BufferedStream> stream : 
streams.entrySet()) {
+                if (stream.getKey().getArea() == area) {
+                    stream.getValue().spillToDiskAndClear(raw);
+                }
+            }
         }
 
-        // Write out the footer.
-        writeStripeFooter(footer, dataSize, indexSize, dirEntry);
-    }
-
-    @Override
-    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws 
IOException {
-        long startPosition = out.getPos();
-        OrcProto.Metadata metadata = builder.build();
-        writeMetadata(metadata);
-        this.metadataLength = (int) (out.getPos() - startPosition);
-    }
-
-    @Override
-    public void writeFileFooter(OrcProto.Footer.Builder builder) throws 
IOException {
-        long bodyLength = out.getPos() - metadataLength;
-        builder.setContentLength(bodyLength);
-        builder.setHeaderLength(headerLength);
-        long startPosition = out.getPos();
-        OrcProto.Footer footer = builder.build();
-        writeFileFooter(footer);
-        this.footerLength = (int) (out.getPos() - startPosition);
-    }
-
-    @Override
-    public long writePostScript(OrcProto.PostScript.Builder builder) throws 
IOException {
-        builder.setFooterLength(footerLength);
-        builder.setMetadataLength(metadataLength);
-
-        OrcProto.PostScript ps = builder.build();
-        // need to write this uncompressed
-        long startPosition = out.getPos();
-        ps.writeTo(out);
-        long length = out.getPos() - startPosition;
-
-        if (length > 255) {
-            throw new IllegalArgumentException("PostScript too large at " + 
length);
+        /**
+         * Computed the size of the given column on disk for this stripe. It 
excludes the index
+         * streams.
+         *
+         * @param column a column id
+         * @return the total number of bytes
+         */
+        public long getFileBytes(int column) {
+            long result = 0;
+            if (column >= rootColumn && column <= lastColumn) {
+                for (Map.Entry<StreamName, BufferedStream> entry : 
streams.entrySet()) {
+                    StreamName name = entry.getKey();
+                    if (name.getColumn() == column && name.getArea() != 
StreamName.Area.INDEX) {
+                        result += entry.getValue().getOutputSize();
+                    }
+                }
+            }
+            return result;
         }
-
-        out.write((int) length);
-        return out.getPos();
-    }
-
-    @Override
-    public void close() {
-        // Just release the codec but don't close the internal stream here to 
avoid
-        // Stream Closed or ClosedChannelException when Flink performs 
checkpoint.
-        OrcCodecPool.returnCodec(compress, codec);
-        codec = null;
     }
 
-    @Override
-    public void flush() throws IOException {
-        out.flush();
-    }
-
-    @Override
-    public void appendRawStripe(ByteBuffer buffer, 
OrcProto.StripeInformation.Builder dirEntry)
-            throws IOException {
-        long start = out.getPos();
-        int length = buffer.remaining();
-        long availBlockSpace = blockSize - (start % blockSize);
-
-        // see if stripe can fit in the current hdfs block, else pad the 
remaining
-        // space in the block
-        if (length < blockSize && length > availBlockSpace && addBlockPadding) 
{
-            byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, 
availBlockSpace)];
-            LOG.info(String.format("Padding ORC by %d bytes while merging..", 
availBlockSpace));
-            start += availBlockSpace;
-            while (availBlockSpace > 0) {
-                int writeLen = (int) Math.min(availBlockSpace, pad.length);
-                out.write(pad, 0, writeLen);
-                availBlockSpace -= writeLen;
-            }
+    VariantTracker getVariant(EncryptionVariant column) {
+        if (column == null) {
+            return unencrypted;
         }
-
-        out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), 
length);
-        dirEntry.setOffset(start);
+        return variants.get(column);
     }
 
     @Override
-    public CompressionCodec getCompressionCodec() {
-        return this.codec;
+    public long getFileBytes(int column, WriterEncryptionVariant variant) {
+        return getVariant(variant).getFileBytes(column);
     }
 
     @Override
-    public long getFileBytes(int column) {
-        long size = 0;
-
-        for (final Map.Entry<StreamName, BufferedStream> pair : 
streams.entrySet()) {
-            final BufferedStream receiver = pair.getValue();
-            if (!receiver.isSuppressed) {
+    public StreamOptions getStreamOptions() {
+        return unencrypted.options;
+    }
 
-                final StreamName name = pair.getKey();
-                if (name.getColumn() == column && name.getArea() != 
StreamName.Area.INDEX) {
-                    size += receiver.getOutputSize();
-                }
-            }
+    private static void writeZeros(OutputStream output, long remaining) throws 
IOException {
+        while (remaining > 0) {
+            long size = Math.min(ZEROS.length, remaining);
+            output.write(ZEROS, 0, (int) size);
+            remaining -= size;
         }
-
-        return size;
     }
 
     private void padStripe(long stripeSize) throws IOException {
-        this.stripeStart = out.getPos();
+        this.stripeStart = rawWriter.getPos();
         long previousBytesInBlock = (stripeStart - blockOffset) % blockSize;
-
         // We only have options if this isn't the first stripe in the block
         if (previousBytesInBlock > 0) {
             if (previousBytesInBlock + stripeSize >= blockSize) {
                 // Try making a short block
-                if (writeVariableLengthBlocks && 
shims.endVariableLengthBlock(out)) {
+                if (writeVariableLengthBlocks && 
shims.endVariableLengthBlock(rawWriter)) {
                     blockOffset = stripeStart;
                 } else if (addBlockPadding) {
                     // if we cross the block boundary, figure out what we 
should do
                     long padding = blockSize - previousBytesInBlock;
                     if (padding <= maxPadding) {
-                        writeZeros(out, padding);
+                        writeZeros(rawWriter, padding);
                         stripeStart += padding;
                     }
                 }
@@ -303,62 +290,225 @@ public class PhysicalWriterImpl implements 
PhysicalWriter {
         }
     }
 
+    private static class DirectStream implements OutputReceiver {
+        private final FSDataOutputStream output;
+
+        DirectStream(FSDataOutputStream output) {
+            this.output = output;
+        }
+
+        @Override
+        public void output(ByteBuffer buffer) throws IOException {
+            output.write(
+                    buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+        }
+
+        @Override
+        public void suppress() {
+            throw new UnsupportedOperationException("Can't suppress direct 
stream");
+        }
+    }
+
     private void writeStripeFooter(
             OrcProto.StripeFooter footer,
-            long dataSize,
-            long indexSize,
+            SizeCounters sizes,
             OrcProto.StripeInformation.Builder dirEntry)
             throws IOException {
         writeStripeFooter(footer);
-
         dirEntry.setOffset(stripeStart);
-        dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - 
indexSize);
+        dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - 
sizes.total());
     }
 
     protected void writeMetadata(OrcProto.Metadata metadata) throws 
IOException {
-        metadata.writeTo(protobufWriter);
-        protobufWriter.flush();
+        metadata.writeTo(codedCompressStream);
+        codedCompressStream.flush();
         writer.flush();
     }
 
     protected void writeFileFooter(OrcProto.Footer footer) throws IOException {
-        footer.writeTo(protobufWriter);
-        protobufWriter.flush();
+        footer.writeTo(codedCompressStream);
+        codedCompressStream.flush();
         writer.flush();
     }
 
     protected void writeStripeFooter(OrcProto.StripeFooter footer) throws 
IOException {
-        footer.writeTo(protobufWriter);
-        protobufWriter.flush();
+        footer.writeTo(codedCompressStream);
+        codedCompressStream.flush();
         writer.flush();
     }
 
-    private static void writeZeros(OutputStream output, long remaining) throws 
IOException {
-        while (remaining > 0) {
-            long size = Math.min(ZEROS.length, remaining);
-            output.write(ZEROS, 0, (int) size);
-            remaining -= size;
+    static void writeEncryptedStripeStatistics(
+            DirectStream output, int stripeNumber, VariantTracker tracker) 
throws IOException {
+        StreamOptions options = new StreamOptions(tracker.options);
+        tracker.stripeStatsStreams.clear();
+        for (int col = tracker.rootColumn;
+                col < tracker.rootColumn + tracker.stripeStats.length;
+                ++col) {
+            options.modifyIv(
+                    CryptoUtils.modifyIvForStream(
+                            col, OrcProto.Stream.Kind.STRIPE_STATISTICS, 
stripeNumber + 1));
+            OutStream stream = new OutStream("stripe stats for " + col, 
options, output);
+            OrcProto.ColumnarStripeStatistics stats =
+                    OrcProto.ColumnarStripeStatistics.newBuilder()
+                            .addAllColStats(tracker.stripeStats[col - 
tracker.rootColumn])
+                            .build();
+            long start = output.output.getPos();
+            stats.writeTo(stream);
+            stream.flush();
+            OrcProto.Stream description =
+                    OrcProto.Stream.newBuilder()
+                            .setColumn(col)
+                            .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS)
+                            .setLength(output.output.getPos() - start)
+                            .build();
+            tracker.stripeStatsStreams.add(description);
         }
     }
 
-    private static class DirectStream implements OutputReceiver {
-        private final FSDataOutputStream output;
+    static void setUnencryptedStripeStatistics(
+            OrcProto.Metadata.Builder builder,
+            int stripeCount,
+            List<OrcProto.ColumnStatistics>[] stats) {
+        // Make the unencrypted stripe stats into lists of StripeStatistics.
+        builder.clearStripeStats();
+        for (int s = 0; s < stripeCount; ++s) {
+            OrcProto.StripeStatistics.Builder stripeStats = 
OrcProto.StripeStatistics.newBuilder();
+            for (List<OrcProto.ColumnStatistics> col : stats) {
+                stripeStats.addColStats(col.get(s));
+            }
+            builder.addStripeStats(stripeStats.build());
+        }
+    }
 
-        DirectStream(FSDataOutputStream output) {
-            this.output = output;
+    static void setEncryptionStatistics(
+            OrcProto.Encryption.Builder encryption,
+            int stripeNumber,
+            Collection<VariantTracker> variants)
+            throws IOException {
+        int v = 0;
+        for (VariantTracker variant : variants) {
+            OrcProto.EncryptionVariant.Builder variantBuilder = 
encryption.getVariantsBuilder(v++);
+
+            // Add the stripe statistics streams to the variant description.
+            variantBuilder.clearStripeStatistics();
+            variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams);
+
+            // Serialize and encrypt the file statistics.
+            OrcProto.FileStatistics.Builder file = 
OrcProto.FileStatistics.newBuilder();
+            for (OrcProto.ColumnStatistics col : variant.fileStats) {
+                file.addColumn(col);
+            }
+            StreamOptions options = new StreamOptions(variant.options);
+            options.modifyIv(
+                    CryptoUtils.modifyIvForStream(
+                            variant.rootColumn,
+                            OrcProto.Stream.Kind.FILE_STATISTICS,
+                            stripeNumber + 1));
+            BufferedStream buffer = new BufferedStream();
+            OutStream stream = new OutStream("stats for " + variant, options, 
buffer);
+            file.build().writeTo(stream);
+            stream.flush();
+            variantBuilder.setFileStatistics(buffer.getBytes());
         }
+    }
 
-        public void output(ByteBuffer buffer) throws IOException {
-            this.output.write(
-                    buffer.array(), buffer.arrayOffset() + buffer.position(), 
buffer.remaining());
+    @Override
+    public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws 
IOException {
+        long stripeStatisticsStart = rawWriter.getPos();
+        for (VariantTracker variant : variants.values()) {
+            writeEncryptedStripeStatistics(rawStream, stripeNumber, variant);
         }
+        setUnencryptedStripeStatistics(builder, stripeNumber, 
unencrypted.stripeStats);
+        long metadataStart = rawWriter.getPos();
+        writeMetadata(builder.build());
+        this.stripeStatisticsLength = (int) (metadataStart - 
stripeStatisticsStart);
+        this.metadataLength = (int) (rawWriter.getPos() - metadataStart);
+    }
 
-        public void suppress() {
-            throw new UnsupportedOperationException("Can't suppress direct 
stream");
+    static void addUnencryptedStatistics(
+            OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] 
stats) {
+        for (OrcProto.ColumnStatistics stat : stats) {
+            builder.addStatistics(stat);
+        }
+    }
+
+    @Override
+    public void writeFileFooter(OrcProto.Footer.Builder builder) throws 
IOException {
+        if (!variants.isEmpty()) {
+            OrcProto.Encryption.Builder encryption = 
builder.getEncryptionBuilder();
+            setEncryptionStatistics(encryption, stripeNumber, 
variants.values());
         }
+        addUnencryptedStatistics(builder, unencrypted.fileStats);
+        long bodyLength = rawWriter.getPos() - metadataLength - 
stripeStatisticsLength;
+        builder.setContentLength(bodyLength);
+        builder.setHeaderLength(headerLength);
+        long startPosn = rawWriter.getPos();
+        OrcProto.Footer footer = builder.build();
+        writeFileFooter(footer);
+        this.footerLength = (int) (rawWriter.getPos() - startPosn);
     }
 
-    private static final class BufferedStream implements OutputReceiver {
+    @Override
+    public long writePostScript(OrcProto.PostScript.Builder builder) throws 
IOException {
+        builder.setFooterLength(footerLength);
+        builder.setMetadataLength(metadataLength);
+        if (!variants.isEmpty()) {
+            builder.setStripeStatisticsLength(stripeStatisticsLength);
+        }
+        OrcProto.PostScript ps = builder.build();
+        // need to write this uncompressed
+        long startPosn = rawWriter.getPos();
+        ps.writeTo(rawWriter);
+        long length = rawWriter.getPos() - startPosn;
+        if (length > 255) {
+            throw new IllegalArgumentException("PostScript too large at " + 
length);
+        }
+        rawWriter.write((int) length);
+        return rawWriter.getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Just release the codec but don't close the internal stream here to 
avoid
+        // Stream Closed or ClosedChannelException when Flink performs 
checkpoint.
+
+        CompressionCodec codec = compress.getCodec();
+        if (codec != null) {
+            OrcCodecPool.returnCodec(codec.getKind(), codec);
+        }
+        compress.withCodec(null, null);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        rawWriter.flush();
+    }
+
+    @Override
+    public void appendRawStripe(ByteBuffer buffer, 
OrcProto.StripeInformation.Builder dirEntry)
+            throws IOException {
+        long start = rawWriter.getPos();
+        int length = buffer.remaining();
+        long availBlockSpace = blockSize - (start % blockSize);
+
+        // see if stripe can fit in the current hdfs block, else pad the 
remaining
+        // space in the block
+        if (length < blockSize && length > availBlockSpace && addBlockPadding) 
{
+            byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, 
availBlockSpace)];
+            LOG.debug("Padding ORC by {} bytes while merging", 
availBlockSpace);
+            start += availBlockSpace;
+            while (availBlockSpace > 0) {
+                int writeLen = (int) Math.min(availBlockSpace, pad.length);
+                rawWriter.write(pad, 0, writeLen);
+                availBlockSpace -= writeLen;
+            }
+        }
+        rawWriter.write(buffer.array(), buffer.arrayOffset() + 
buffer.position(), length);
+        dirEntry.setOffset(start);
+        stripeNumber += 1;
+    }
+
+    static final class BufferedStream implements OutputReceiver {
         private boolean isSuppressed = false;
         private final List<ByteBuffer> output = new ArrayList<>();
 
@@ -369,12 +519,18 @@ public class PhysicalWriterImpl implements PhysicalWriter 
{
             }
         }
 
+        @Override
         public void suppress() {
             isSuppressed = true;
             output.clear();
         }
 
-        void spillToDiskAndClear(FSDataOutputStream raw) throws IOException {
+        /**
+         * Write any saved buffers to the OutputStream if needed, and clears 
all the buffers.
+         *
+         * @return true if the stream was written
+         */
+        boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException 
{
             if (!isSuppressed) {
                 for (ByteBuffer buffer : output) {
                     raw.write(
@@ -383,10 +539,38 @@ public class PhysicalWriterImpl implements PhysicalWriter 
{
                             buffer.remaining());
                 }
                 output.clear();
+                return true;
             }
             isSuppressed = false;
+            return false;
+        }
+
+        /**
+         * Get the buffer as a protobuf ByteString and clears the 
BufferedStream.
+         *
+         * @return the bytes
+         */
+        ByteString getBytes() {
+            int len = output.size();
+            if (len == 0) {
+                return ByteString.EMPTY;
+            } else {
+                ByteString result = ByteString.copyFrom(output.get(0));
+                for (int i = 1; i < output.size(); ++i) {
+                    result = result.concat(ByteString.copyFrom(output.get(i)));
+                }
+                output.clear();
+                return result;
+            }
         }
 
+        /**
+         * Get the number of bytes that will be written to the output.
+         *
+         * <p>Assumes the stream writing into this receiver has already been 
flushed.
+         *
+         * @return number of bytes
+         */
         public long getOutputSize() {
             long result = 0;
             for (ByteBuffer buffer : output) {
@@ -395,4 +579,141 @@ public class PhysicalWriterImpl implements PhysicalWriter 
{
             return result;
         }
     }
+
+    static class SizeCounters {
+        long index = 0;
+        long data = 0;
+
+        long total() {
+            return index + data;
+        }
+    }
+
+    void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, 
SizeCounters sizes)
+            throws IOException {
+        
footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, 
sizes));
+        final long unencryptedIndexSize = sizes.index;
+        int v = 0;
+        for (VariantTracker variant : variants.values()) {
+            OrcProto.StripeEncryptionVariant.Builder builder =
+                    footerBuilder.getEncryptionBuilder(v++);
+            builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, 
sizes));
+        }
+        if (sizes.index != unencryptedIndexSize) {
+            // add a placeholder that covers the hole where the encrypted 
indexes are
+            footerBuilder.addStreams(
+                    OrcProto.Stream.newBuilder()
+                            .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX)
+                            .setLength(sizes.index - unencryptedIndexSize));
+        }
+        
footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, 
sizes));
+        final long unencryptedDataSize = sizes.data;
+        v = 0;
+        for (VariantTracker variant : variants.values()) {
+            OrcProto.StripeEncryptionVariant.Builder builder =
+                    footerBuilder.getEncryptionBuilder(v++);
+            builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, 
sizes));
+        }
+        if (sizes.data != unencryptedDataSize) {
+            // add a placeholder that covers the hole where the encrypted 
indexes are
+            footerBuilder.addStreams(
+                    OrcProto.Stream.newBuilder()
+                            .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA)
+                            .setLength(sizes.data - unencryptedDataSize));
+        }
+    }
+
+    @Override
+    public void finalizeStripe(
+            OrcProto.StripeFooter.Builder footerBuilder,
+            OrcProto.StripeInformation.Builder dirEntry)
+            throws IOException {
+        SizeCounters sizes = new SizeCounters();
+        buildStreamList(footerBuilder, sizes);
+
+        OrcProto.StripeFooter footer = footerBuilder.build();
+
+        // Do we need to pad the file so the stripe doesn't straddle a block 
boundary?
+        padStripe(sizes.total() + footer.getSerializedSize());
+
+        // write the unencrypted index streams
+        unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter);
+        // write the encrypted index streams
+        for (VariantTracker variant : variants.values()) {
+            variant.writeStreams(StreamName.Area.INDEX, rawWriter);
+        }
+
+        // write the unencrypted data streams
+        unencrypted.writeStreams(StreamName.Area.DATA, rawWriter);
+        // write out the unencrypted data streams
+        for (VariantTracker variant : variants.values()) {
+            variant.writeStreams(StreamName.Area.DATA, rawWriter);
+        }
+
+        // Write out the footer.
+        writeStripeFooter(footer, sizes, dirEntry);
+
+        // fill in the data sizes
+        dirEntry.setDataLength(sizes.data);
+        dirEntry.setIndexLength(sizes.index);
+
+        stripeNumber += 1;
+    }
+
+    @Override
+    public void writeHeader() throws IOException {
+        rawWriter.write(OrcFile.MAGIC.getBytes());
+        headerLength = rawWriter.getPos();
+    }
+
+    @Override
+    public BufferedStream createDataStream(StreamName name) {
+        VariantTracker variant = getVariant(name.getEncryption());
+        BufferedStream result = variant.streams.get(name);
+        if (result == null) {
+            result = new BufferedStream();
+            variant.streams.put(name, result);
+        }
+        return result;
+    }
+
+    protected OutputStream createIndexStream(StreamName name) {
+        BufferedStream buffer = createDataStream(name);
+        VariantTracker tracker = getVariant(name.getEncryption());
+        StreamOptions options =
+                SerializationUtils.getCustomizedCodec(
+                        tracker.options, compressionStrategy, name.getKind());
+        if (options.isEncrypted()) {
+            if (options == tracker.options) {
+                options = new StreamOptions(options);
+            }
+            options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber 
+ 1));
+        }
+        return new OutStream(name.toString(), options, buffer);
+    }
+
+    @Override
+    public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) 
throws IOException {
+        OutputStream stream = createIndexStream(name);
+        index.build().writeTo(stream);
+        stream.flush();
+    }
+
+    @Override
+    public void writeBloomFilter(StreamName name, 
OrcProto.BloomFilterIndex.Builder bloom)
+            throws IOException {
+        OutputStream stream = createIndexStream(name);
+        bloom.build().writeTo(stream);
+        stream.flush();
+    }
+
+    @Override
+    public void writeStatistics(StreamName name, 
OrcProto.ColumnStatistics.Builder statistics) {
+        VariantTracker tracker = getVariant(name.getEncryption());
+        if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) {
+            tracker.fileStats[name.getColumn() - tracker.rootColumn] = 
statistics.build();
+        } else {
+            tracker.stripeStats[name.getColumn() - 
tracker.rootColumn].add(statistics.build());
+        }
+    }
 }
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
index 2b1458c81792..04d6bd74ab51 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java
@@ -204,20 +204,26 @@ class OrcColumnarRowInputFormatTest {
 
         RowType tableType =
                 RowType.of(
-                        /* 0 */ DataTypes.INT().getLogicalType(),
-                        /* 1 */ DataTypes.INT().getLogicalType(), // part-1
-                        /* 2 */ DataTypes.STRING().getLogicalType(),
-                        /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2
-                        /* 4 */ DataTypes.STRING().getLogicalType(),
-                        /* 5 */ DataTypes.STRING().getLogicalType(), // part-3
-                        /* 6 */ DataTypes.STRING().getLogicalType(),
-                        /* 7 */ DataTypes.INT().getLogicalType(),
-                        /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // 
part-4
-                        /* 9 */ DataTypes.STRING().getLogicalType(),
-                        /* 11*/ DataTypes.INT().getLogicalType(),
-                        /* 12*/ DataTypes.INT().getLogicalType(),
-                        /* 13*/ DataTypes.STRING().getLogicalType(), // part-5
-                        /* 14*/ DataTypes.INT().getLogicalType());
+                        new LogicalType[] {
+                            /* 0 */ DataTypes.INT().getLogicalType(),
+                            /* 1 */ DataTypes.INT().getLogicalType(), // part-1
+                            /* 2 */ DataTypes.STRING().getLogicalType(),
+                            /* 3 */ DataTypes.BIGINT().getLogicalType(), // 
part-2
+                            /* 4 */ DataTypes.STRING().getLogicalType(),
+                            /* 5 */ DataTypes.STRING().getLogicalType(), // 
part-3
+                            /* 6 */ DataTypes.STRING().getLogicalType(),
+                            /* 7 */ DataTypes.INT().getLogicalType(),
+                            /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), 
// part-4
+                            /* 9 */ DataTypes.STRING().getLogicalType(),
+                            /* 10*/ DataTypes.INT().getLogicalType(),
+                            /* 11*/ DataTypes.INT().getLogicalType(),
+                            /* 12*/ DataTypes.STRING().getLogicalType(), // 
part-5
+                            /* 13*/ DataTypes.INT().getLogicalType()
+                        },
+                        new String[] {
+                            "_col0", "f1", "_col1", "f3", "_col2", "f5", 
"_col3", "_col4", "f8",
+                            "_col5", "_col6", "_col7", "f13", "_col8"
+                        });
 
         int[] projectedFields = {8, 1, 3, 0, 5, 2};
 
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
index bf0418c657f8..a7fd08c343d2 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
@@ -73,6 +73,11 @@ public class OrcColumnarRowSplitReaderTest {
                 DataTypes.INT()
             };
 
+    private final String[] testSchemaNameFlat =
+            new String[] {
+                "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", 
"_col7", "_col8"
+            };
+
     private final DataType[] testSchemaDecimal = new DataType[] 
{DataTypes.DECIMAL(10, 5)};
 
     private static Path testFileFlat;
@@ -97,7 +102,12 @@ public class OrcColumnarRowSplitReaderTest {
         for (FileInputSplit split : splits) {
 
             try (OrcColumnarRowSplitReader reader =
-                    createReader(new int[] {0, 1}, testSchemaFlat, new 
HashMap<>(), split)) {
+                    createReader(
+                            new int[] {0, 1},
+                            testSchemaFlat,
+                            testSchemaNameFlat,
+                            new HashMap<>(),
+                            split)) {
                 // read and count all rows
                 while (!reader.reachedEnd()) {
                     RowData row = reader.nextRecord(null);
@@ -119,7 +129,12 @@ public class OrcColumnarRowSplitReaderTest {
         FileInputSplit[] splits = createSplits(testFileDecimal, 1);
 
         try (OrcColumnarRowSplitReader reader =
-                createReader(new int[] {0}, testSchemaDecimal, new 
HashMap<>(), splits[0])) {
+                createReader(
+                        new int[] {0},
+                        testSchemaDecimal,
+                        new String[] {"_col0"},
+                        new HashMap<>(),
+                        splits[0])) {
             assertThat(reader.reachedEnd()).isFalse();
             RowData row = reader.nextRecord(null);
 
@@ -176,10 +191,14 @@ public class OrcColumnarRowSplitReaderTest {
                                 /* 7 */ DataTypes.INT(),
                                 /* 8 */ DataTypes.DECIMAL(10, 5), // part-4
                                 /* 9 */ DataTypes.STRING(),
+                                /* 10*/ DataTypes.INT(),
                                 /* 11*/ DataTypes.INT(),
-                                /* 12*/ DataTypes.INT(),
-                                /* 13*/ DataTypes.STRING(), // part-5
-                                /* 14*/ DataTypes.INT()
+                                /* 12*/ DataTypes.STRING(), // part-5
+                                /* 13*/ DataTypes.INT()
+                            },
+                            new String[] {
+                                "_col0", "f1", "_col1", "f3", "_col2", "f5", 
"_col3", "_col4", "f8",
+                                "_col5", "_col6", "_col7", "f13", "_col8"
                             },
                             partSpec,
                             split)) {
@@ -222,7 +241,12 @@ public class OrcColumnarRowSplitReaderTest {
         for (FileInputSplit split : splits) {
 
             try (OrcColumnarRowSplitReader reader =
-                    createReader(new int[] {2, 0, 1}, testSchemaFlat, new 
HashMap<>(), split)) {
+                    createReader(
+                            new int[] {2, 0, 1},
+                            testSchemaFlat,
+                            testSchemaNameFlat,
+                            new HashMap<>(),
+                            split)) {
                 // read and count all rows
                 while (!reader.reachedEnd()) {
                     RowData row = reader.nextRecord(null);
@@ -403,10 +427,25 @@ public class OrcColumnarRowSplitReaderTest {
             Map<String, Object> partitionSpec,
             FileInputSplit split)
             throws IOException {
+        return createReader(
+                selectedFields,
+                fullTypes,
+                IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + 
i).toArray(String[]::new),
+                partitionSpec,
+                split);
+    }
+
+    protected OrcColumnarRowSplitReader createReader(
+            int[] selectedFields,
+            DataType[] fullTypes,
+            String[] fullNames,
+            Map<String, Object> partitionSpec,
+            FileInputSplit split)
+            throws IOException {
         return OrcSplitReaderUtil.genPartColumnarRowReader(
                 "2.3.0",
                 new Configuration(),
-                IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + 
i).toArray(String[]::new),
+                fullNames,
                 fullTypes,
                 partitionSpec,
                 selectedFields,
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
index f2d2bdaf65ad..17e912c2c8e9 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java
@@ -206,10 +206,10 @@ public class OrcFormatStatisticsReportTest extends 
StatisticsReportTestBase {
                 "f_timestamp9",
                 new ColumnStats.Builder()
                         .setMax(
-                                DateTimeUtils.parseTimestampData("1990-10-16 
12:12:43.123", 3)
+                                DateTimeUtils.parseTimestampData("1990-10-16 
12:12:43.123456789", 9)
                                         .toTimestamp())
                         .setMin(
-                                DateTimeUtils.parseTimestampData("1990-10-14 
12:12:43.123", 3)
+                                DateTimeUtils.parseTimestampData("1990-10-14 
12:12:43.123456789", 9)
                                         .toTimestamp())
                         .setNullCount(0L)
                         .build());
diff --git a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE 
b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE
index 8554030115d6..872bd7125355 100644
--- a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE
+++ b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE
@@ -6,13 +6,13 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.orc:orc-core:1.5.6
-- org.apache.orc:orc-shims:1.5.6
-- org.apache.hive:hive-storage-api:2.6.0
-- io.airlift:aircompressor:0.10
+- org.apache.orc:orc-core:1.9.4
+- org.apache.orc:orc-shims:1.9.4
+- org.apache.hive:hive-storage-api:2.8.1
+- io.airlift:aircompressor:0.27
 - commons-lang:commons-lang:2.6
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details.
 
-- com.google.protobuf:protobuf-java:2.5.0
+- com.google.protobuf:protobuf-java:3.21.7
diff --git a/pom.xml b/pom.xml
index 165a3dc038a1..8ade0d9b7a1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,7 +170,8 @@ under the License.
                -->
                <minikdc.version>3.2.4</minikdc.version>
                <hive.version>2.3.10</hive.version>
-               <orc.version>1.5.6</orc.version>
+               <orc.version>1.9.4</orc.version>
+               <storage-api.version>2.8.1</storage-api.version>
                <japicmp.referenceVersion>1.20.0</japicmp.referenceVersion>
                <japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
                <checkstyle.version>10.18.2</checkstyle.version>

Reply via email to